基于云端OpenClaw的情绪互动机器人系统-Milk-V Duo S端动作调度状态机与安全校验机制设计(8)
前几周的工作中,我主要围绕 Milk-V Duo S 端开发环境、WiFi 自动连接、串口通信、云端指令轮询、动作组执行以及复杂动作组设计展开。到目前为止,机器人端已经能够完成一个基本闭环:云端 OpenClaw Agent 根据用户输入生成动作序列,Webhook 服务把动作序列存入队列,Milk-V Duo S 端通过 HTTP 轮询获取动作编号,然后通过串口控制机器人执行对应动作组。
但是在继续调试过程中,我发现如果 Duo S 端只是简单地写成:
while True:
result = poll_action()
if result:
run_sequence(result["action_sequence"])
ack_action(result["command_id"])
虽然能跑通演示,但工程上还不够可靠。因为机器人端不是普通软件程序,它最终控制的是 17 个自由度舵机。一旦动作序列错误、动作正在执行时又收到新任务、网络断开、串口异常、复杂动作连续执行,都可能导致机器人姿态失控或者动作表现不稳定。
因此本周我在 Milk-V Duo S 端设计了两个新的核心模块:
- 动作调度状态机模块
- 动作安全校验与风险分级模块
这两个模块的目标是把 Duo S 端从“能执行动作”升级为“能够安全、稳定、有状态地执行动作”。
一、为什么要引入动作调度状态机
在最初版本中,Duo S 端的逻辑比较直接:
收到动作序列 -> 执行动作 -> 等待执行完成 -> 回传 ACK
这种方式的问题是:程序内部并没有明确记录机器人当前处于什么状态。
例如:
- 机器人是否正在轮询云端?
- 是否已经收到动作但还没执行?
- 是否正在执行复杂动作组?
- 当前动作是否通过安全校验?
- ACK 是否已经成功回传?
- 如果执行失败,是否进入安全回正?
这些状态如果都混在一个 while True 循环中,后期调试会非常困难。因此我将 Duo S 端抽象成一个状态机。
状态机的核心思想是:机器人端在任意时刻都只处于一个明确状态,并且状态之间按照固定流程切换。
本项目中我设计的状态如下:
| 状态 | 含义 |
|---|---|
| IDLE | 空闲状态,机器人等待任务 |
| POLLING | 正在向云端轮询动作任务 |
| RECEIVED | 已收到云端任务 |
| VALIDATING | 正在校验动作序列 |
| RUNNING | 正在执行动作序列 |
| ACKING | 正在向云端回传执行结果 |
| ERROR | 执行或通信出现异常 |
| RECOVERING | 异常后执行安全回正 |
状态流转关系如下:
IDLE
↓
POLLING
↓
RECEIVED
↓
VALIDATING
↓
RUNNING
↓
ACKING
↓
IDLE
如果中途异常:
ERROR → RECOVERING → IDLE
技术理解:
机器人动作执行具有明显的时间属性。一个动作组不是瞬间完成的,而是需要几秒甚至几十秒。在这段时间里,串口不能被其他动作序列抢占。因此 Duo S 端必须知道自己是否正在执行动作。状态机相当于给机器人端建立了一套“任务生命周期管理机制”,让每一次动作执行都可追踪、可控制、可恢复。
二、使用 AI 辅助生成状态枚举代码
提示词 1:生成机器人端状态枚举
请帮我生成一个运行在 Milk-V Duo S 开发板上的 Python 状态枚举模块,用于基于云端 OpenClaw 的情绪互动机器人项目。
要求:
1. 使用 Python 标准库 enum.Enum。
2. 定义机器人端执行状态 RobotState。
3. 状态包括:
- IDLE:空闲
- POLLING:正在轮询云端
- RECEIVED:已收到任务
- VALIDATING:正在校验动作序列
- RUNNING:正在执行动作
- ACKING:正在回传 ACK
- ERROR:异常状态
- RECOVERING:安全恢复状态
4. 每个枚举值使用字符串,方便打印日志。
5. 代码要简洁清晰,可以直接保存为 robot_state.py。
AI 生成代码:robot_state.py
from enum import Enum
class RobotState(Enum):
"""
Milk-V Duo S 端机器人执行状态枚举。
每一个状态表示机器人端任务生命周期中的一个阶段。
"""
IDLE = "IDLE" # 空闲,等待任务
POLLING = "POLLING" # 正在轮询云端
RECEIVED = "RECEIVED" # 已收到动作任务
VALIDATING = "VALIDATING" # 正在校验动作序列
RUNNING = "RUNNING" # 正在执行动作序列
ACKING = "ACKING" # 正在回传执行确认
ERROR = "ERROR" # 执行或通信异常
RECOVERING = "RECOVERING" # 异常后安全恢复
技术理解
这里没有直接使用普通字符串,而是使用 Enum 枚举。这样做的好处是:
- 避免字符串拼写错误;
- 状态值统一管理;
- 日志打印更加清晰;
- 后期增加新状态更方便;
- 代码可读性更强。
比如如果手写:
state = "RUNING"
少写一个 N 程序也不会报错,但状态判断会失败。而枚举可以避免这类问题。
三、动作元数据与风险分级设计
原来 Duo S 端只有一个简单的动作时长表:
ACTION_DURATION = {
0: 0,
1: 12,
2: 10,
3: 10,
4: 9,
5: 15,
6: 72,
7: 20,
8: 10,
9: 13,
10: 12,
11: 15,
12: 15,
13: 12,
14: 16,
15: 18,
16: 20
}
这种方式只能告诉程序“动作要等几秒”,但是不能告诉程序这个动作是否危险、是否需要执行前站立、是否需要执行后回正。
随着我们新增了 11-前进招手、12-鞠躬谢谢、13-下蹲保护 这类复杂手脚协同动作,简单时长表已经不够用了。
因此我将动作库升级为“动作元数据表”。
每个动作不仅有编号,还有:
- 动作名称
- 预计执行时间
- 风险等级
- 是否需要动作前站立
- 是否需要动作后回正
- 是否属于复杂动作
四、使用 AI 辅助生成动作元数据模块
提示词 2:生成动作元数据表
请帮我生成一个 Python 模块 action_meta.py,用于 Milk-V Duo S 端机器人动作库管理。
项目背景:
机器人通过云端 OpenClaw Agent 获取动作编号序列,Milk-V Duo S 端根据动作编号控制机器人执行本地动作组。现在需要把简单的 ACTION_DURATION 升级为动作元数据表。
要求:
1. 定义动作风险等级:
- LOW
- MEDIUM
- HIGH
2. 使用字典 ACTION_META 管理动作信息。
3. 每个动作包含:
- name:动作名称
- duration:预计执行时间,单位秒
- risk:风险等级
- need_stand_before:执行前是否需要站立
- need_stand_after:执行后是否需要回正
- complex:是否为复杂动作组
4. 包含动作编号:
0 站立
1 挥手
2 摇头
3 左右致谢
4 交通指挥
5 舞蹈
6 自由飞翔
7 下蹲
8 前进
9 后退
10 普通动作
11 前进招手
12 鞠躬谢谢
13 下蹲保护
16 左招手
17 右招手
5. 提供函数:
- get_action_meta(action_id)
- get_action_duration(action_id)
- is_high_risk(action_id)
- need_stand_before(action_id)
- need_stand_after(action_id)
6. 代码要能在嵌入式 Linux 的 Python 环境中运行,不依赖第三方库。
AI 生成代码:action_meta.py
from enum import Enum
class RiskLevel(Enum):
"""
动作风险等级。
LOW:普通动作,风险较低。
MEDIUM:需要一定姿态控制的动作。
HIGH:涉及大幅度重心变化或手脚协同的复杂动作。
"""
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
ACTION_META = {
0: {
"name": "站立",
"duration": 5,
"risk": RiskLevel.LOW,
"need_stand_before": False,
"need_stand_after": False,
"complex": False
},
1: {
"name": "挥手",
"duration": 12,
"risk": RiskLevel.LOW,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
},
2: {
"name": "摇头",
"duration": 10,
"risk": RiskLevel.LOW,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
},
3: {
"name": "左右致谢",
"duration": 10,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
},
4: {
"name": "交通指挥",
"duration": 9,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
},
5: {
"name": "舞蹈",
"duration": 15,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
6: {
"name": "自由飞翔",
"duration": 72,
"risk": RiskLevel.HIGH,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
7: {
"name": "下蹲",
"duration": 20,
"risk": RiskLevel.HIGH,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
8: {
"name": "前进",
"duration": 10,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
9: {
"name": "后退",
"duration": 13,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
10: {
"name": "普通动作",
"duration": 12,
"risk": RiskLevel.LOW,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
},
11: {
"name": "前进招手",
"duration": 22,
"risk": RiskLevel.HIGH,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
12: {
"name": "鞠躬谢谢",
"duration": 8,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
13: {
"name": "下蹲保护",
"duration": 8,
"risk": RiskLevel.HIGH,
"need_stand_before": True,
"need_stand_after": True,
"complex": True
},
16: {
"name": "左招手",
"duration": 4,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
},
17: {
"name": "右招手",
"duration": 4,
"risk": RiskLevel.MEDIUM,
"need_stand_before": True,
"need_stand_after": True,
"complex": False
}
}
def get_action_meta(action_id):
"""
获取动作元数据。
如果动作编号不存在,返回 None。
"""
return ACTION_META.get(action_id)
def get_action_duration(action_id):
"""
获取动作预计执行时间。
如果动作不存在,则返回默认 3 秒,避免程序直接崩溃。
"""
meta = get_action_meta(action_id)
if meta is None:
return 3
return meta["duration"]
def is_high_risk(action_id):
"""
判断动作是否为高风险动作。
"""
meta = get_action_meta(action_id)
if meta is None:
return False
return meta["risk"] == RiskLevel.HIGH
def need_stand_before(action_id):
"""
判断动作执行前是否需要先站立。
"""
meta = get_action_meta(action_id)
if meta is None:
return False
return meta["need_stand_before"]
def need_stand_after(action_id):
"""
判断动作执行后是否需要回到站立。
"""
meta = get_action_meta(action_id)
if meta is None:
return False
return meta["need_stand_after"]
def get_action_name(action_id):
"""
获取动作名称,主要用于日志打印。
"""
meta = get_action_meta(action_id)
if meta is None:
return "未知动作"
return meta["name"]
技术理解
动作编号本身没有任何语义。比如 11 对 Python 来说只是一个整数,但是在机器人系统中,它代表的是一个包含前进、招手、重心变化和回正过程的复杂动作组。
所以我把动作编号升级为动作元数据后,Duo S 端就可以根据动作属性做出不同策略:
- 普通动作可以直接执行;
- 复杂动作前先站立;
- 高风险动作后必须回正;
- 未知动作直接拒绝执行;
- 长动作需要更长等待时间;
- 连续高风险动作需要限制。
这样端侧就不再只是被动执行云端编号,而是具备了一定的安全判断能力。
五、动作序列安全校验设计
云端 OpenClaw Agent 在生成动作序列时已经设置了动作白名单和 normalize 兜底校验,但是我认为 Duo S 端仍然必须再校验一次。
原因是:云端出错最多是 JSON 不规范,而 Duo S 端出错会直接控制舵机运动。
因此我设计了端侧动作序列校验规则:
| 校验项目 | 作用 |
|---|---|
| 是否为 list | 防止云端返回格式错误 |
| 是否为空 | 防止空动作执行 |
| 长度是否过长 | 防止机器人连续运行过久 |
| 动作编号是否合法 | 防止未知动作编号 |
| 高风险动作是否过多 | 防止连续复杂动作造成不稳定 |
| 是否自动补站立动作 | 保证执行前后安全 |
六、使用 AI 辅助生成动作校验模块
提示词 3:生成动作序列安全校验代码
请帮我生成一个 Python 模块 action_validator.py,用于 Milk-V Duo S 机器人端动作序列安全校验。
背景:
云端 OpenClaw Agent 会返回 action_sequence,例如 [11, 12, 0]。Milk-V Duo S 端需要在真正执行舵机动作之前进行二次校验。
要求:
1. 从 action_meta.py 中导入 ACTION_META、is_high_risk、need_stand_before、need_stand_after。
2. 设计 validate_sequence(sequence) 函数。
3. 校验规则:
- sequence 必须是 list
- sequence 不能为空
- sequence 长度不能超过 MAX_SEQUENCE_LENGTH
- 每个 action_id 必须在 ACTION_META 中
- 连续高风险动作不能超过 MAX_HIGH_RISK_COUNT
4. 设计 normalize_sequence(sequence) 函数。
- 如果第一个动作需要站立前置,则自动在前面插入 0
- 如果最后一个动作需要站立回正,则自动在最后插入 0
- 避免重复插入连续的 0
5. 设计 remove_duplicate_stand(sequence) 函数,去除连续站立动作。
6. 返回值要包含是否通过、错误原因、规范化后的动作序列。
7. 不依赖第三方库。
AI 生成代码:action_validator.py
from action_meta import ACTION_META
from action_meta import is_high_risk
from action_meta import need_stand_before
from action_meta import need_stand_after
MAX_SEQUENCE_LENGTH = 8
MAX_HIGH_RISK_COUNT = 2
STAND_ACTION_ID = 0
def remove_duplicate_stand(sequence):
"""
去除连续重复的站立动作。
例如 [0, 0, 11, 0, 0] 会变成 [0, 11, 0]。
"""
if not sequence:
return sequence
cleaned = []
last_action = None
for action in sequence:
if action == STAND_ACTION_ID and last_action == STAND_ACTION_ID:
continue
cleaned.append(action)
last_action = action
return cleaned
def normalize_sequence(sequence):
"""
对动作序列进行规范化处理。
主要包括:
1. 高风险或复杂动作前自动补站立;
2. 高风险或复杂动作后自动回正;
3. 删除连续重复的站立动作。
"""
if not sequence:
return sequence
normalized = list(sequence)
first_action = normalized[0]
last_action = normalized[-1]
# 如果第一个动作需要前置站立,并且当前不是站立,则自动补 0。
if need_stand_before(first_action) and first_action != STAND_ACTION_ID:
normalized.insert(0, STAND_ACTION_ID)
# 如果最后一个动作需要执行后回正,并且当前最后不是站立,则自动补 0。
if need_stand_after(last_action) and last_action != STAND_ACTION_ID:
normalized.append(STAND_ACTION_ID)
normalized = remove_duplicate_stand(normalized)
return normalized
def validate_sequence(sequence):
"""
校验云端返回的动作序列是否合法。
返回:
{
"ok": True 或 False,
"reason": "校验结果说明",
"sequence": 规范化后的动作序列
}
"""
if not isinstance(sequence, list):
return {
"ok": False,
"reason": "动作序列必须是 list 类型",
"sequence": []
}
if len(sequence) == 0:
return {
"ok": False,
"reason": "动作序列不能为空",
"sequence": []
}
if len(sequence) > MAX_SEQUENCE_LENGTH:
return {
"ok": False,
"reason": f"动作序列过长,最大长度为 {MAX_SEQUENCE_LENGTH}",
"sequence": []
}
high_risk_count = 0
for action_id in sequence:
if not isinstance(action_id, int):
return {
"ok": False,
"reason": f"动作编号必须是整数,当前值为 {action_id}",
"sequence": []
}
if action_id not in ACTION_META:
return {
"ok": False,
"reason": f"非法动作编号:{action_id}",
"sequence": []
}
if is_high_risk(action_id):
high_risk_count += 1
if high_risk_count > MAX_HIGH_RISK_COUNT:
return {
"ok": False,
"reason": f"高风险动作过多,最多允许 {MAX_HIGH_RISK_COUNT} 个",
"sequence": []
}
normalized = normalize_sequence(sequence)
return {
"ok": True,
"reason": "动作序列校验通过",
"sequence": normalized
}
技术理解
这段代码最核心的地方不是判断 list,而是对机器人动作风险进行限制。
例如云端如果返回:
[11, 13, 6, 7]
虽然每个动作编号都存在,但这个序列对机器人来说风险很高。因为:
11是前进招手,涉及手脚协同;13是下蹲保护,涉及低重心;6是自由飞翔,执行时间长;7是下蹲,涉及大幅姿态变化。
如果连续执行这类动作,机器人容易出现姿态累积误差。因此端侧必须限制高风险动作数量。
另一个重要设计是自动补站立动作。例如云端返回:
[11, 12]
端侧会自动规范化成:
[0, 11, 12, 0]
这样可以保证复杂动作前先回到标准站立,复杂动作后也回到标准站立。
七、机器人串口控制模块优化
我原来的 RobotController 已经可以通过串口发送动作指令:
PL0 SQ1 SM100 ONCE
其中:
PL0表示动作组区域;SQ1表示执行 1 号动作组;SM100表示动作速度;ONCE表示执行一次。
但是状态机版本需要更安全的串口控制,因此我加入了:
- 串口互斥锁;
- 动作执行函数;
- 安全停止函数;
- 默认速度配置;
- 串口异常捕获。
八、使用 AI 辅助生成串口控制模块
提示词 4:生成安全串口控制模块
请帮我生成一个 Python 模块 robot_controller.py,用于 Milk-V Duo S 通过串口控制机器人动作组。
项目背景:
机器人动作组通过 32 路舵机控制板执行,Milk-V Duo S 使用 /dev/ttyS0 串口发送动作组命令,波特率 115200。动作指令格式为:
PL0 SQ{action_id} SM{speed} ONCE
要求:
1. 使用 pyserial。
2. 定义 RobotController 类。
3. 初始化参数包括 port="/dev/ttyS0"、baudrate=115200、default_speed=100。
4. 提供 send_cmd(cmd) 方法,自动添加 \r\n。
5. 提供 run_action(action_id, speed=None) 方法。
6. 提供 stand() 方法,执行 0 号站立动作。
7. 提供 stop() 方法,发送 PL0 停止动作。
8. 提供 close() 方法关闭串口。
9. 使用 threading.Lock 保证同一时间只有一个线程写串口。
10. 代码要有异常处理和打印日志,适合在嵌入式 Linux 上运行。
AI 生成代码:robot_controller.py
import time
import threading
import serial
class RobotController:
"""
Milk-V Duo S 端机器人串口控制类。
负责把动作编号转换为舵机控制板可以识别的串口指令。
"""
def __init__(self, port="/dev/ttyS0", baudrate=115200, default_speed=100):
self.port = port
self.baudrate = baudrate
self.default_speed = default_speed
self.lock = threading.Lock()
try:
self.ser = serial.Serial(port, baudrate, timeout=1)
time.sleep(1)
print(f"[RobotController] 串口已打开: {port}, baudrate={baudrate}")
except Exception as e:
print(f"[RobotController] 串口打开失败: {e}")
raise e
def send_cmd(self, cmd):
"""
发送原始串口命令。
"""
full_cmd = cmd + "\r\n"
with self.lock:
try:
self.ser.write(full_cmd.encode("utf-8"))
print(f"[RobotController] 发送: {cmd}")
return True
except Exception as e:
print(f"[RobotController] 串口发送失败: {e}")
return False
def run_action(self, action_id, speed=None):
"""
执行动作组。
"""
if speed is None:
speed = self.default_speed
cmd = f"PL0 SQ{action_id} SM{speed} ONCE"
return self.send_cmd(cmd)
def stand(self):
"""
执行 0 号站立动作,作为安全回正。
"""
print("[RobotController] 执行站立回正")
return self.run_action(0, speed=self.default_speed)
def stop(self):
"""
停止当前动作。
"""
print("[RobotController] 停止动作")
return self.send_cmd("PL0")
def close(self):
"""
关闭串口。
"""
try:
self.ser.close()
print("[RobotController] 串口已关闭")
except Exception as e:
print(f"[RobotController] 串口关闭失败: {e}")
技术理解
这里最关键的是 threading.Lock()。
虽然当前主程序大多数时候是单线程运行,但后续如果加入网络监听、日志线程、异常恢复线程,就可能出现多个地方同时调用串口的情况。串口是典型的独占资源,如果多个动作命令同时写入,可能出现命令交错,导致舵机控制板接收到错误指令。
因此我在 send_cmd() 中加锁,保证同一时刻只有一条命令写入串口。
九、动作调度器设计
有了状态枚举、动作元数据、安全校验和串口控制之后,还需要一个统一的调度器负责把它们组合起来。
动作调度器的职责是:
- 接收云端动作序列;
- 切换到校验状态;
- 调用校验模块;
- 执行动作序列;
- 根据动作时长等待;
- 发生异常时进入恢复状态;
- 执行结束后回到 IDLE。
十、使用 AI 辅助生成动作调度器
提示词 5:生成状态机动作调度器
请帮我生成一个 Python 模块 action_scheduler.py,用于 Milk-V Duo S 端机器人动作调度。
项目背景:
云端 OpenClaw Agent 返回动作序列,例如 [11, 12, 0]。Milk-V Duo S 端需要先校验动作序列,再通过串口执行动作组,并在异常时进行安全回正。
要求:
1. 导入 RobotState。
2. 导入 action_validator.validate_sequence。
3. 导入 action_meta.get_action_duration、get_action_name、is_high_risk。
4. 定义 ActionScheduler 类。
5. 初始化时传入 RobotController 对象。
6. 内部维护 state 状态,初始为 IDLE。
7. 提供 set_state(new_state) 方法,打印状态切换日志。
8. 提供 run_sequence(sequence) 方法:
- 切换到 VALIDATING
- 校验动作序列
- 校验失败则进入 ERROR 和 RECOVERING
- 校验成功后切换到 RUNNING
- 逐个执行动作
- 根据动作时长 sleep
- 高风险动作执行前后打印提示
- 执行完成后切换回 IDLE
9. 提供 recover() 方法:
- 切换到 RECOVERING
- 停止当前动作
- 执行站立动作
- 等待 5 秒
- 回到 IDLE
10. 使用 running 标志位防止重复执行。
11. 代码要详细、清晰、有注释。
AI 生成代码:action_scheduler.py
import time
from robot_state import RobotState
from action_validator import validate_sequence
from action_meta import get_action_duration
from action_meta import get_action_name
from action_meta import is_high_risk
class ActionScheduler:
"""
Milk-V Duo S 端动作调度状态机。
负责动作序列的校验、执行、等待和异常恢复。
"""
def __init__(self, robot):
self.robot = robot
self.state = RobotState.IDLE
self.running = False
def set_state(self, new_state):
"""
切换机器人状态,并打印状态变化。
"""
old_state = self.state
self.state = new_state
print(f"[Scheduler] 状态切换: {old_state.value} -> {new_state.value}")
def run_sequence(self, sequence):
"""
执行动作序列。
"""
if self.running:
print("[Scheduler] 当前已有动作正在执行,拒绝新序列")
return {
"ok": False,
"reason": "机器人正在执行上一条动作序列"
}
self.running = True
try:
self.set_state(RobotState.VALIDATING)
result = validate_sequence(sequence)
if not result["ok"]:
print(f"[Scheduler] 动作序列校验失败: {result['reason']}")
self.set_state(RobotState.ERROR)
self.recover()
return {
"ok": False,
"reason": result["reason"]
}
safe_sequence = result["sequence"]
print(f"[Scheduler] 校验通过,规范化动作序列: {safe_sequence}")
self.set_state(RobotState.RUNNING)
for action_id in safe_sequence:
action_name = get_action_name(action_id)
duration = get_action_duration(action_id)
if is_high_risk(action_id):
print(f"[Scheduler] 即将执行高风险动作: {action_id} - {action_name}")
print(f"[Scheduler] 执行动作: {action_id} - {action_name}, 预计 {duration} 秒")
ok = self.robot.run_action(action_id)
if not ok:
raise RuntimeError(f"动作 {action_id} 串口发送失败")
time.sleep(duration)
print(f"[Scheduler] 动作完成: {action_id} - {action_name}")
self.set_state(RobotState.IDLE)
print("[Scheduler] 动作序列执行完成")
return {
"ok": True,
"reason": "动作序列执行完成"
}
except Exception as e:
print(f"[Scheduler] 执行异常: {e}")
self.set_state(RobotState.ERROR)
self.recover()
return {
"ok": False,
"reason": str(e)
}
finally:
self.running = False
def recover(self):
"""
异常恢复流程。
无论发生什么错误,都尽量让机器人停止并回到站立姿态。
"""
self.set_state(RobotState.RECOVERING)
try:
self.robot.stop()
time.sleep(1)
self.robot.stand()
time.sleep(5)
print("[Scheduler] 安全回正完成")
except Exception as e:
print(f"[Scheduler] 安全恢复失败: {e}")
finally:
self.set_state(RobotState.IDLE)
技术理解
动作调度器是本篇博客最核心的模块。它把之前分散在不同文件里的逻辑整合成一个完整流程:
动作序列输入
↓
动作安全校验
↓
动作序列规范化
↓
逐个动作执行
↓
根据动作元数据等待
↓
异常时安全回正
其中 running 标志位非常重要。它可以防止机器人在一个动作还没执行完时又执行新动作。例如,机器人正在执行 11-前进招手,如果云端又发来 13-下蹲保护,直接执行会导致两个动作在姿态上冲突。通过 running 可以拒绝新序列,保证动作执行的完整性。
recover() 函数也很关键。机器人系统不能像普通程序一样出错后直接退出,因为退出时机器人可能停留在半下蹲、半抬手、前倾等不稳定姿态。异常恢复模块会先发送停止命令,再执行 0 号站立动作,让机器人尽量回到安全状态。
十一、云端轮询与状态机集成
原来的 Duo S 端云端客户端主要负责三件事:
- 轮询
/poll/{client_id}; - 执行动作序列;
- 调用
/ack/{client_id}回传确认。
现在加入状态机后,主程序的结构更加清晰:
POLLING:轮询云端
RECEIVED:收到任务
VALIDATING/RUNNING:交给 ActionScheduler
ACKING:回传执行结果
IDLE:等待下一轮
十二、使用 AI 辅助生成主程序集成代码
提示词 6:生成状态机主程序
请帮我生成一个 Python 主程序 main.py,用于 Milk-V Duo S 端连接云端 OpenClaw Webhook 服务并执行机器人动作。
项目背景:
云端服务提供:
GET /poll/{client_id}
POST /ack/{client_id}
Milk-V Duo S 端需要周期性轮询云端,如果收到 action_sequence,则交给 ActionScheduler 执行,执行完成后回传 ACK。
要求:
1. 导入 requests、time。
2. 导入 RobotState、RobotController、ActionScheduler。
3. 配置 SERVER、CLIENT_ID、POLL_INTERVAL。
4. 定义 poll_action():
- GET 请求云端 poll 接口
- timeout=5
- 异常时返回 None
5. 定义 ack_action(command_id, execute_result):
- POST 请求云端 ack 接口
- 回传 command_id、status、reason
6. main() 函数中:
- 初始化 RobotController
- 初始化 ActionScheduler
- 使用 last_command_id 防止重复执行
- 状态切换包括 POLLING、RECEIVED、ACKING、IDLE
- 收到 action_sequence 后执行 scheduler.run_sequence(sequence)
- 执行后回传 ACK
7. KeyboardInterrupt 时停止机器人并关闭串口。
8. 代码要适合在 Milk-V Duo S 的 Python 环境中运行。
AI 生成代码:main.py
import time
import requests
from robot_state import RobotState
from robot_controller import RobotController
from action_scheduler import ActionScheduler
SERVER = "http://47.93.27.196:8765"
CLIENT_ID = "milk_duos_001"
POLL_INTERVAL = 2
def poll_action():
"""
从云端轮询动作任务。
"""
url = f"{SERVER}/poll/{CLIENT_ID}"
try:
response = requests.get(url, timeout=5)
return response.json()
except Exception as e:
print(f"[Cloud] 轮询失败: {e}")
return None
def ack_action(command_id, execute_result):
"""
向云端回传动作执行结果。
"""
url = f"{SERVER}/ack/{CLIENT_ID}"
status = "done" if execute_result.get("ok") else "failed"
data = {
"command_id": command_id,
"client_id": CLIENT_ID,
"status": status,
"reason": execute_result.get("reason")
}
try:
requests.post(url, json=data, timeout=5)
print(f"[Cloud] ACK 已回传: {data}")
return True
except Exception as e:
print(f"[Cloud] ACK 回传失败: {e}")
return False
def main():
"""
Milk-V Duo S 端主循环。
"""
print("[Main] Duo S 端机器人客户端启动")
robot = RobotController()
scheduler = ActionScheduler(robot)
last_command_id = None
try:
while True:
scheduler.set_state(RobotState.POLLING)
result = poll_action()
if result is None:
scheduler.set_state(RobotState.IDLE)
time.sleep(POLL_INTERVAL)
continue
if result.get("status") == "no_action":
scheduler.set_state(RobotState.IDLE)
time.sleep(POLL_INTERVAL)
continue
if result.get("action_sequence"):
scheduler.set_state(RobotState.RECEIVED)
command_id = result.get("command_id")
sequence = result.get("action_sequence")
if command_id == last_command_id:
print(f"[Main] 重复 command_id,跳过执行: {command_id}")
scheduler.set_state(RobotState.IDLE)
time.sleep(POLL_INTERVAL)
continue
print(f"[Main] 收到云端动作任务: command_id={command_id}, sequence={sequence}")
execute_result = scheduler.run_sequence(sequence)
scheduler.set_state(RobotState.ACKING)
ack_action(command_id, execute_result)
last_command_id = command_id
scheduler.set_state(RobotState.IDLE)
else:
print(f"[Main] 未知云端响应: {result}")
scheduler.set_state(RobotState.IDLE)
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
print("[Main] 手动停止程序")
robot.stop()
robot.close()
if __name__ == "__main__":
main()
技术理解
这里保留了 last_command_id 防重复执行机制。因为 HTTP 轮询模式下,可能出现一种情况:Duo S 已经执行了动作,但 ACK 回传失败,云端下一次仍然返回同一个 command_id。如果没有防重复机制,机器人会重复执行同一个动作。
因此 last_command_id 是一个很重要的端侧保护:
if command_id == last_command_id:
print("重复 command_id,跳过执行")
状态机主程序的好处是,每一个阶段都能从日志中看出来:
IDLE -> POLLING
POLLING -> RECEIVED
RECEIVED -> VALIDATING
VALIDATING -> RUNNING
RUNNING -> ACKING
ACKING -> IDLE
这样调试时如果机器人没有动,我可以很快判断问题出在哪里:
- 停在 POLLING:可能是网络问题;
- 停在 RECEIVED:可能是任务格式问题;
- 停在 VALIDATING:可能是动作编号不合法;
- 停在 RUNNING:可能是串口或动作执行问题;
- 停在 ACKING:可能是云端确认接口问题。
十三、完整文件结构设计
经过本次重构后,Milk-V Duo S 端代码结构可以设计成:
milkv-Duo-S/
├── main.py # 主程序入口,负责云端轮询和整体状态切换
├── robot_state.py # 机器人端状态枚举
├── action_meta.py # 动作元数据和风险分级
├── action_validator.py # 动作序列安全校验和规范化
├── robot_controller.py # 串口控制封装
├── action_scheduler.py # 动作调度状态机
├── sequence_control.py # 旧版动作序列执行模块
├── test_servo.py # 本地串口测试
└── README.md # 端侧运行说明
技术理解:
这种结构比之前所有逻辑都放在一个文件中更清晰。每个文件只负责一个功能:
robot_state.py只负责状态定义;action_meta.py只负责动作信息;action_validator.py只负责安全校验;robot_controller.py只负责串口发送;action_scheduler.py只负责动作调度;main.py只负责端云通信主循环。
这体现了软件工程中的模块化思想。模块化以后,后期如果要增加新的复杂动作组,只需要修改 action_meta.py;如果要调整安全规则,只需要修改 action_validator.py;如果要更换通信方式,也不会影响底层动作执行逻辑。
十四、端侧安全机制总结
本次设计后,Duo S 端具备了多层安全机制:
| 安全机制 | 作用 |
|---|---|
| 动作白名单 | 防止未知动作编号执行 |
| 序列长度限制 | 防止动作过长导致机器人连续运行 |
| 高风险动作数量限制 | 防止连续复杂动作造成失稳 |
| 自动前置站立 | 保证复杂动作从标准姿态开始 |
| 自动后置站立 | 保证动作结束后安全回正 |
| 串口互斥锁 | 防止多线程同时写串口 |
| running 标志位 | 防止动作序列并发执行 |
| last_command_id | 防止云端重复任务被重复执行 |
| recover 异常恢复 | 出错后尽量停止并站立 |
| 状态机日志 | 方便定位问题发生在哪个阶段 |
十五、本周总结
本周我将原本较简单的 Duo S 端动作执行逻辑升级成了“动作调度状态机 + 动作安全校验与风险分级机制”。
原来的程序重点是“收到动作编号并执行”,而现在的程序重点变成了:
收到动作编号
↓
判断是否合法
↓
判断风险等级
↓
必要时自动补站立动作
↓
进入状态机调度
↓
串口互斥执行
↓
异常时安全回正
↓
执行结果回传云端
通过这次设计,我对 Milk-V Duo S 端在整个项目中的作用有了更深理解:
云端 OpenClaw 负责理解用户情绪和生成动作序列,但是 Duo S 端不能只是被动执行。因为真正连接机器人硬件的是 Duo S,真正让舵机运动的也是 Duo S。因此端侧必须承担最后一道安全防线,包括动作校验、风险控制、调度管理、串口互斥和异常恢复。
这次重构让机器人端从“能跑”进一步升级为“可靠地跑、安全地跑、可调试地跑”。这也是情绪互动机器人从一个演示 Demo 走向更完整系统的重要一步。
更多推荐



所有评论(0)