前几周的工作中,我主要围绕 Milk-V Duo S 端开发环境、WiFi 自动连接、串口通信、云端指令轮询、动作组执行以及复杂动作组设计展开。到目前为止,机器人端已经能够完成一个基本闭环:云端 OpenClaw Agent 根据用户输入生成动作序列,Webhook 服务把动作序列存入队列,Milk-V Duo S 端通过 HTTP 轮询获取动作编号,然后通过串口控制机器人执行对应动作组。

但是在继续调试过程中,我发现如果 Duo S 端只是简单地写成:

while True:
    result = poll_action()
    if result:
        run_sequence(result["action_sequence"])
        ack_action(result["command_id"])

虽然能跑通演示,但工程上还不够可靠。因为机器人端不是普通软件程序,它最终控制的是 17 个自由度舵机。一旦动作序列错误、动作正在执行时又收到新任务、网络断开、串口异常、复杂动作连续执行,都可能导致机器人姿态失控或者动作表现不稳定。

因此本周我在 Milk-V Duo S 端设计了两个新的核心模块:

  1. 动作调度状态机模块
  2. 动作安全校验与风险分级模块

这两个模块的目标是把 Duo S 端从“能执行动作”升级为“能够安全、稳定、有状态地执行动作”。

一、为什么要引入动作调度状态机

在最初版本中,Duo S 端的逻辑比较直接:

收到动作序列 -> 执行动作 -> 等待执行完成 -> 回传 ACK

这种方式的问题是:程序内部并没有明确记录机器人当前处于什么状态。

例如:

  • 机器人是否正在轮询云端?
  • 是否已经收到动作但还没执行?
  • 是否正在执行复杂动作组?
  • 当前动作是否通过安全校验?
  • ACK 是否已经成功回传?
  • 如果执行失败,是否进入安全回正?

这些状态如果都混在一个 while True 循环中,后期调试会非常困难。因此我将 Duo S 端抽象成一个状态机。

状态机的核心思想是:机器人端在任意时刻都只处于一个明确状态,并且状态之间按照固定流程切换。

本项目中我设计的状态如下:

状态 含义
IDLE 空闲状态,机器人等待任务
POLLING 正在向云端轮询动作任务
RECEIVED 已收到云端任务
VALIDATING 正在校验动作序列
RUNNING 正在执行动作序列
ACKING 正在向云端回传执行结果
ERROR 执行或通信出现异常
RECOVERING 异常后执行安全回正

状态流转关系如下:

IDLE
  ↓
POLLING
  ↓
RECEIVED
  ↓
VALIDATING
  ↓
RUNNING
  ↓
ACKING
  ↓
IDLE

如果中途异常:
ERROR → RECOVERING → IDLE

技术理解:

机器人动作执行具有明显的时间属性。一个动作组不是瞬间完成的,而是需要几秒甚至几十秒。在这段时间里,串口不能被其他动作序列抢占。因此 Duo S 端必须知道自己是否正在执行动作。状态机相当于给机器人端建立了一套“任务生命周期管理机制”,让每一次动作执行都可追踪、可控制、可恢复。


二、使用 AI 辅助生成状态枚举代码

提示词 1:生成机器人端状态枚举

请帮我生成一个运行在 Milk-V Duo S 开发板上的 Python 状态枚举模块,用于基于云端 OpenClaw 的情绪互动机器人项目。

要求:
1. 使用 Python 标准库 enum.Enum。
2. 定义机器人端执行状态 RobotState。
3. 状态包括:
   - IDLE:空闲
   - POLLING:正在轮询云端
   - RECEIVED:已收到任务
   - VALIDATING:正在校验动作序列
   - RUNNING:正在执行动作
   - ACKING:正在回传 ACK
   - ERROR:异常状态
   - RECOVERING:安全恢复状态
4. 每个枚举值使用字符串,方便打印日志。
5. 代码要简洁清晰,可以直接保存为 robot_state.py。

AI 生成代码:robot_state.py

from enum import Enum


class RobotState(Enum):
    """
    Milk-V Duo S 端机器人执行状态枚举。
    每一个状态表示机器人端任务生命周期中的一个阶段。
    """

    IDLE = "IDLE"                 # 空闲,等待任务
    POLLING = "POLLING"           # 正在轮询云端
    RECEIVED = "RECEIVED"         # 已收到动作任务
    VALIDATING = "VALIDATING"     # 正在校验动作序列
    RUNNING = "RUNNING"           # 正在执行动作序列
    ACKING = "ACKING"             # 正在回传执行确认
    ERROR = "ERROR"               # 执行或通信异常
    RECOVERING = "RECOVERING"     # 异常后安全恢复


技术理解

这里没有直接使用普通字符串,而是使用 Enum 枚举。这样做的好处是:

  1. 避免字符串拼写错误;
  2. 状态值统一管理;
  3. 日志打印更加清晰;
  4. 后期增加新状态更方便;
  5. 代码可读性更强。

比如如果手写:

state = "RUNING"

少写一个 N 程序也不会报错,但状态判断会失败。而枚举可以避免这类问题。

三、动作元数据与风险分级设计

原来 Duo S 端只有一个简单的动作时长表:

ACTION_DURATION = {
    0: 0,
    1: 12,
    2: 10,
    3: 10,
    4: 9,
    5: 15,
    6: 72,
    7: 20,
    8: 10,
    9: 13,
    10: 12,
    11: 15,
    12: 15,
    13: 12,
    14: 16,
    15: 18,
    16: 20
}

这种方式只能告诉程序“动作要等几秒”,但是不能告诉程序这个动作是否危险、是否需要执行前站立、是否需要执行后回正。

随着我们新增了 11-前进招手12-鞠躬谢谢13-下蹲保护 这类复杂手脚协同动作,简单时长表已经不够用了。

因此我将动作库升级为“动作元数据表”。

每个动作不仅有编号,还有:

  • 动作名称
  • 预计执行时间
  • 风险等级
  • 是否需要动作前站立
  • 是否需要动作后回正
  • 是否属于复杂动作

四、使用 AI 辅助生成动作元数据模块

提示词 2:生成动作元数据表

请帮我生成一个 Python 模块 action_meta.py,用于 Milk-V Duo S 端机器人动作库管理。

项目背景:
机器人通过云端 OpenClaw Agent 获取动作编号序列,Milk-V Duo S 端根据动作编号控制机器人执行本地动作组。现在需要把简单的 ACTION_DURATION 升级为动作元数据表。

要求:
1. 定义动作风险等级:
   - LOW
   - MEDIUM
   - HIGH
2. 使用字典 ACTION_META 管理动作信息。
3. 每个动作包含:
   - name:动作名称
   - duration:预计执行时间,单位秒
   - risk:风险等级
   - need_stand_before:执行前是否需要站立
   - need_stand_after:执行后是否需要回正
   - complex:是否为复杂动作组
4. 包含动作编号:
   0 站立
   1 挥手
   2 摇头
   3 左右致谢
   4 交通指挥
   5 舞蹈
   6 自由飞翔
   7 下蹲
   8 前进
   9 后退
   10 普通动作
   11 前进招手
   12 鞠躬谢谢
   13 下蹲保护
   16 左招手
   17 右招手
5. 提供函数:
   - get_action_meta(action_id)
   - get_action_duration(action_id)
   - is_high_risk(action_id)
   - need_stand_before(action_id)
   - need_stand_after(action_id)
6. 代码要能在嵌入式 Linux 的 Python 环境中运行,不依赖第三方库。

AI 生成代码:action_meta.py

from enum import Enum


class RiskLevel(Enum):
    """
    动作风险等级。
    LOW:普通动作,风险较低。
    MEDIUM:需要一定姿态控制的动作。
    HIGH:涉及大幅度重心变化或手脚协同的复杂动作。
    """

    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"


ACTION_META = {
    0: {
        "name": "站立",
        "duration": 5,
        "risk": RiskLevel.LOW,
        "need_stand_before": False,
        "need_stand_after": False,
        "complex": False
    },
    1: {
        "name": "挥手",
        "duration": 12,
        "risk": RiskLevel.LOW,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    },
    2: {
        "name": "摇头",
        "duration": 10,
        "risk": RiskLevel.LOW,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    },
    3: {
        "name": "左右致谢",
        "duration": 10,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    },
    4: {
        "name": "交通指挥",
        "duration": 9,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    },
    5: {
        "name": "舞蹈",
        "duration": 15,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    6: {
        "name": "自由飞翔",
        "duration": 72,
        "risk": RiskLevel.HIGH,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    7: {
        "name": "下蹲",
        "duration": 20,
        "risk": RiskLevel.HIGH,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    8: {
        "name": "前进",
        "duration": 10,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    9: {
        "name": "后退",
        "duration": 13,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    10: {
        "name": "普通动作",
        "duration": 12,
        "risk": RiskLevel.LOW,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    },
    11: {
        "name": "前进招手",
        "duration": 22,
        "risk": RiskLevel.HIGH,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    12: {
        "name": "鞠躬谢谢",
        "duration": 8,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    13: {
        "name": "下蹲保护",
        "duration": 8,
        "risk": RiskLevel.HIGH,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": True
    },
    16: {
        "name": "左招手",
        "duration": 4,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    },
    17: {
        "name": "右招手",
        "duration": 4,
        "risk": RiskLevel.MEDIUM,
        "need_stand_before": True,
        "need_stand_after": True,
        "complex": False
    }
}


def get_action_meta(action_id):
    """
    获取动作元数据。
    如果动作编号不存在,返回 None。
    """
    return ACTION_META.get(action_id)


def get_action_duration(action_id):
    """
    获取动作预计执行时间。
    如果动作不存在,则返回默认 3 秒,避免程序直接崩溃。
    """
    meta = get_action_meta(action_id)
    if meta is None:
        return 3
    return meta["duration"]


def is_high_risk(action_id):
    """
    判断动作是否为高风险动作。
    """
    meta = get_action_meta(action_id)
    if meta is None:
        return False
    return meta["risk"] == RiskLevel.HIGH


def need_stand_before(action_id):
    """
    判断动作执行前是否需要先站立。
    """
    meta = get_action_meta(action_id)
    if meta is None:
        return False
    return meta["need_stand_before"]


def need_stand_after(action_id):
    """
    判断动作执行后是否需要回到站立。
    """
    meta = get_action_meta(action_id)
    if meta is None:
        return False
    return meta["need_stand_after"]


def get_action_name(action_id):
    """
    获取动作名称,主要用于日志打印。
    """
    meta = get_action_meta(action_id)
    if meta is None:
        return "未知动作"
    return meta["name"]

技术理解

动作编号本身没有任何语义。比如 11 对 Python 来说只是一个整数,但是在机器人系统中,它代表的是一个包含前进、招手、重心变化和回正过程的复杂动作组。

所以我把动作编号升级为动作元数据后,Duo S 端就可以根据动作属性做出不同策略:

  • 普通动作可以直接执行;
  • 复杂动作前先站立;
  • 高风险动作后必须回正;
  • 未知动作直接拒绝执行;
  • 长动作需要更长等待时间;
  • 连续高风险动作需要限制。

这样端侧就不再只是被动执行云端编号,而是具备了一定的安全判断能力。


五、动作序列安全校验设计

云端 OpenClaw Agent 在生成动作序列时已经设置了动作白名单和 normalize 兜底校验,但是我认为 Duo S 端仍然必须再校验一次。

原因是:云端出错最多是 JSON 不规范,而 Duo S 端出错会直接控制舵机运动。

因此我设计了端侧动作序列校验规则:

校验项目 作用
是否为 list 防止云端返回格式错误
是否为空 防止空动作执行
长度是否过长 防止机器人连续运行过久
动作编号是否合法 防止未知动作编号
高风险动作是否过多 防止连续复杂动作造成不稳定
是否自动补站立动作 保证执行前后安全

六、使用 AI 辅助生成动作校验模块

提示词 3:生成动作序列安全校验代码

请帮我生成一个 Python 模块 action_validator.py,用于 Milk-V Duo S 机器人端动作序列安全校验。

背景:
云端 OpenClaw Agent 会返回 action_sequence,例如 [11, 12, 0]。Milk-V Duo S 端需要在真正执行舵机动作之前进行二次校验。

要求:
1. 从 action_meta.py 中导入 ACTION_META、is_high_risk、need_stand_before、need_stand_after。
2. 设计 validate_sequence(sequence) 函数。
3. 校验规则:
   - sequence 必须是 list
   - sequence 不能为空
   - sequence 长度不能超过 MAX_SEQUENCE_LENGTH
   - 每个 action_id 必须在 ACTION_META 中
   - 连续高风险动作不能超过 MAX_HIGH_RISK_COUNT
4. 设计 normalize_sequence(sequence) 函数。
   - 如果第一个动作需要站立前置,则自动在前面插入 0
   - 如果最后一个动作需要站立回正,则自动在最后插入 0
   - 避免重复插入连续的 0
5. 设计 remove_duplicate_stand(sequence) 函数,去除连续站立动作。
6. 返回值要包含是否通过、错误原因、规范化后的动作序列。
7. 不依赖第三方库。

AI 生成代码:action_validator.py

from action_meta import ACTION_META
from action_meta import is_high_risk
from action_meta import need_stand_before
from action_meta import need_stand_after


MAX_SEQUENCE_LENGTH = 8
MAX_HIGH_RISK_COUNT = 2
STAND_ACTION_ID = 0


def remove_duplicate_stand(sequence):
    """
    去除连续重复的站立动作。
    例如 [0, 0, 11, 0, 0] 会变成 [0, 11, 0]。
    """
    if not sequence:
        return sequence

    cleaned = []
    last_action = None

    for action in sequence:
        if action == STAND_ACTION_ID and last_action == STAND_ACTION_ID:
            continue

        cleaned.append(action)
        last_action = action

    return cleaned


def normalize_sequence(sequence):
    """
    对动作序列进行规范化处理。
    主要包括:
    1. 高风险或复杂动作前自动补站立;
    2. 高风险或复杂动作后自动回正;
    3. 删除连续重复的站立动作。
    """
    if not sequence:
        return sequence

    normalized = list(sequence)

    first_action = normalized[0]
    last_action = normalized[-1]

    # 如果第一个动作需要前置站立,并且当前不是站立,则自动补 0。
    if need_stand_before(first_action) and first_action != STAND_ACTION_ID:
        normalized.insert(0, STAND_ACTION_ID)

    # 如果最后一个动作需要执行后回正,并且当前最后不是站立,则自动补 0。
    if need_stand_after(last_action) and last_action != STAND_ACTION_ID:
        normalized.append(STAND_ACTION_ID)

    normalized = remove_duplicate_stand(normalized)

    return normalized


def validate_sequence(sequence):
    """
    校验云端返回的动作序列是否合法。

    返回:
    {
        "ok": True 或 False,
        "reason": "校验结果说明",
        "sequence": 规范化后的动作序列
    }
    """

    if not isinstance(sequence, list):
        return {
            "ok": False,
            "reason": "动作序列必须是 list 类型",
            "sequence": []
        }

    if len(sequence) == 0:
        return {
            "ok": False,
            "reason": "动作序列不能为空",
            "sequence": []
        }

    if len(sequence) > MAX_SEQUENCE_LENGTH:
        return {
            "ok": False,
            "reason": f"动作序列过长,最大长度为 {MAX_SEQUENCE_LENGTH}",
            "sequence": []
        }

    high_risk_count = 0

    for action_id in sequence:
        if not isinstance(action_id, int):
            return {
                "ok": False,
                "reason": f"动作编号必须是整数,当前值为 {action_id}",
                "sequence": []
            }

        if action_id not in ACTION_META:
            return {
                "ok": False,
                "reason": f"非法动作编号:{action_id}",
                "sequence": []
            }

        if is_high_risk(action_id):
            high_risk_count += 1

    if high_risk_count > MAX_HIGH_RISK_COUNT:
        return {
            "ok": False,
            "reason": f"高风险动作过多,最多允许 {MAX_HIGH_RISK_COUNT} 个",
            "sequence": []
        }

    normalized = normalize_sequence(sequence)

    return {
        "ok": True,
        "reason": "动作序列校验通过",
        "sequence": normalized
    }

技术理解

这段代码最核心的地方不是判断 list,而是对机器人动作风险进行限制。

例如云端如果返回:

[11, 13, 6, 7]

虽然每个动作编号都存在,但这个序列对机器人来说风险很高。因为:

  • 11 是前进招手,涉及手脚协同;
  • 13 是下蹲保护,涉及低重心;
  • 6 是自由飞翔,执行时间长;
  • 7 是下蹲,涉及大幅姿态变化。

如果连续执行这类动作,机器人容易出现姿态累积误差。因此端侧必须限制高风险动作数量。

另一个重要设计是自动补站立动作。例如云端返回:

[11, 12]

端侧会自动规范化成:

[0, 11, 12, 0]

这样可以保证复杂动作前先回到标准站立,复杂动作后也回到标准站立。


七、机器人串口控制模块优化

我原来的 RobotController 已经可以通过串口发送动作指令:

PL0 SQ1 SM100 ONCE

其中:

  • PL0 表示动作组区域;
  • SQ1 表示执行 1 号动作组;
  • SM100 表示动作速度;
  • ONCE 表示执行一次。

但是状态机版本需要更安全的串口控制,因此我加入了:

  1. 串口互斥锁;
  2. 动作执行函数;
  3. 安全停止函数;
  4. 默认速度配置;
  5. 串口异常捕获。

八、使用 AI 辅助生成串口控制模块

提示词 4:生成安全串口控制模块

请帮我生成一个 Python 模块 robot_controller.py,用于 Milk-V Duo S 通过串口控制机器人动作组。

项目背景:
机器人动作组通过 32 路舵机控制板执行,Milk-V Duo S 使用 /dev/ttyS0 串口发送动作组命令,波特率 115200。动作指令格式为:
PL0 SQ{action_id} SM{speed} ONCE

要求:
1. 使用 pyserial。
2. 定义 RobotController 类。
3. 初始化参数包括 port="/dev/ttyS0"、baudrate=115200、default_speed=100。
4. 提供 send_cmd(cmd) 方法,自动添加 \r\n。
5. 提供 run_action(action_id, speed=None) 方法。
6. 提供 stand() 方法,执行 0 号站立动作。
7. 提供 stop() 方法,发送 PL0 停止动作。
8. 提供 close() 方法关闭串口。
9. 使用 threading.Lock 保证同一时间只有一个线程写串口。
10. 代码要有异常处理和打印日志,适合在嵌入式 Linux 上运行。

AI 生成代码:robot_controller.py

import time
import threading
import serial


class RobotController:
    """
    Milk-V Duo S 端机器人串口控制类。
    负责把动作编号转换为舵机控制板可以识别的串口指令。
    """

    def __init__(self, port="/dev/ttyS0", baudrate=115200, default_speed=100):
        self.port = port
        self.baudrate = baudrate
        self.default_speed = default_speed
        self.lock = threading.Lock()

        try:
            self.ser = serial.Serial(port, baudrate, timeout=1)
            time.sleep(1)
            print(f"[RobotController] 串口已打开: {port}, baudrate={baudrate}")
        except Exception as e:
            print(f"[RobotController] 串口打开失败: {e}")
            raise e

    def send_cmd(self, cmd):
        """
        发送原始串口命令。
        """
        full_cmd = cmd + "\r\n"

        with self.lock:
            try:
                self.ser.write(full_cmd.encode("utf-8"))
                print(f"[RobotController] 发送: {cmd}")
                return True
            except Exception as e:
                print(f"[RobotController] 串口发送失败: {e}")
                return False

    def run_action(self, action_id, speed=None):
        """
        执行动作组。
        """
        if speed is None:
            speed = self.default_speed

        cmd = f"PL0 SQ{action_id} SM{speed} ONCE"
        return self.send_cmd(cmd)

    def stand(self):
        """
        执行 0 号站立动作,作为安全回正。
        """
        print("[RobotController] 执行站立回正")
        return self.run_action(0, speed=self.default_speed)

    def stop(self):
        """
        停止当前动作。
        """
        print("[RobotController] 停止动作")
        return self.send_cmd("PL0")

    def close(self):
        """
        关闭串口。
        """
        try:
            self.ser.close()
            print("[RobotController] 串口已关闭")
        except Exception as e:
            print(f"[RobotController] 串口关闭失败: {e}")

技术理解

这里最关键的是 threading.Lock()

虽然当前主程序大多数时候是单线程运行,但后续如果加入网络监听、日志线程、异常恢复线程,就可能出现多个地方同时调用串口的情况。串口是典型的独占资源,如果多个动作命令同时写入,可能出现命令交错,导致舵机控制板接收到错误指令。

因此我在 send_cmd() 中加锁,保证同一时刻只有一条命令写入串口。


九、动作调度器设计

有了状态枚举、动作元数据、安全校验和串口控制之后,还需要一个统一的调度器负责把它们组合起来。

动作调度器的职责是:

  1. 接收云端动作序列;
  2. 切换到校验状态;
  3. 调用校验模块;
  4. 执行动作序列;
  5. 根据动作时长等待;
  6. 发生异常时进入恢复状态;
  7. 执行结束后回到 IDLE。

十、使用 AI 辅助生成动作调度器

提示词 5:生成状态机动作调度器

请帮我生成一个 Python 模块 action_scheduler.py,用于 Milk-V Duo S 端机器人动作调度。

项目背景:
云端 OpenClaw Agent 返回动作序列,例如 [11, 12, 0]。Milk-V Duo S 端需要先校验动作序列,再通过串口执行动作组,并在异常时进行安全回正。

要求:
1. 导入 RobotState。
2. 导入 action_validator.validate_sequence。
3. 导入 action_meta.get_action_duration、get_action_name、is_high_risk。
4. 定义 ActionScheduler 类。
5. 初始化时传入 RobotController 对象。
6. 内部维护 state 状态,初始为 IDLE。
7. 提供 set_state(new_state) 方法,打印状态切换日志。
8. 提供 run_sequence(sequence) 方法:
   - 切换到 VALIDATING
   - 校验动作序列
   - 校验失败则进入 ERROR 和 RECOVERING
   - 校验成功后切换到 RUNNING
   - 逐个执行动作
   - 根据动作时长 sleep
   - 高风险动作执行前后打印提示
   - 执行完成后切换回 IDLE
9. 提供 recover() 方法:
   - 切换到 RECOVERING
   - 停止当前动作
   - 执行站立动作
   - 等待 5 秒
   - 回到 IDLE
10. 使用 running 标志位防止重复执行。
11. 代码要详细、清晰、有注释。

AI 生成代码:action_scheduler.py

import time

from robot_state import RobotState
from action_validator import validate_sequence
from action_meta import get_action_duration
from action_meta import get_action_name
from action_meta import is_high_risk


class ActionScheduler:
    """
    Milk-V Duo S 端动作调度状态机。
    负责动作序列的校验、执行、等待和异常恢复。
    """

    def __init__(self, robot):
        self.robot = robot
        self.state = RobotState.IDLE
        self.running = False

    def set_state(self, new_state):
        """
        切换机器人状态,并打印状态变化。
        """
        old_state = self.state
        self.state = new_state
        print(f"[Scheduler] 状态切换: {old_state.value} -> {new_state.value}")

    def run_sequence(self, sequence):
        """
        执行动作序列。
        """

        if self.running:
            print("[Scheduler] 当前已有动作正在执行,拒绝新序列")
            return {
                "ok": False,
                "reason": "机器人正在执行上一条动作序列"
            }

        self.running = True

        try:
            self.set_state(RobotState.VALIDATING)

            result = validate_sequence(sequence)

            if not result["ok"]:
                print(f"[Scheduler] 动作序列校验失败: {result['reason']}")
                self.set_state(RobotState.ERROR)
                self.recover()
                return {
                    "ok": False,
                    "reason": result["reason"]
                }

            safe_sequence = result["sequence"]
            print(f"[Scheduler] 校验通过,规范化动作序列: {safe_sequence}")

            self.set_state(RobotState.RUNNING)

            for action_id in safe_sequence:
                action_name = get_action_name(action_id)
                duration = get_action_duration(action_id)

                if is_high_risk(action_id):
                    print(f"[Scheduler] 即将执行高风险动作: {action_id} - {action_name}")

                print(f"[Scheduler] 执行动作: {action_id} - {action_name}, 预计 {duration} 秒")

                ok = self.robot.run_action(action_id)

                if not ok:
                    raise RuntimeError(f"动作 {action_id} 串口发送失败")

                time.sleep(duration)

                print(f"[Scheduler] 动作完成: {action_id} - {action_name}")

            self.set_state(RobotState.IDLE)
            print("[Scheduler] 动作序列执行完成")

            return {
                "ok": True,
                "reason": "动作序列执行完成"
            }

        except Exception as e:
            print(f"[Scheduler] 执行异常: {e}")
            self.set_state(RobotState.ERROR)
            self.recover()

            return {
                "ok": False,
                "reason": str(e)
            }

        finally:
            self.running = False

    def recover(self):
        """
        异常恢复流程。
        无论发生什么错误,都尽量让机器人停止并回到站立姿态。
        """

        self.set_state(RobotState.RECOVERING)

        try:
            self.robot.stop()
            time.sleep(1)

            self.robot.stand()
            time.sleep(5)

            print("[Scheduler] 安全回正完成")

        except Exception as e:
            print(f"[Scheduler] 安全恢复失败: {e}")

        finally:
            self.set_state(RobotState.IDLE)

技术理解

动作调度器是本篇博客最核心的模块。它把之前分散在不同文件里的逻辑整合成一个完整流程:

动作序列输入
  ↓
动作安全校验
  ↓
动作序列规范化
  ↓
逐个动作执行
  ↓
根据动作元数据等待
  ↓
异常时安全回正

其中 running 标志位非常重要。它可以防止机器人在一个动作还没执行完时又执行新动作。例如,机器人正在执行 11-前进招手,如果云端又发来 13-下蹲保护,直接执行会导致两个动作在姿态上冲突。通过 running 可以拒绝新序列,保证动作执行的完整性。

recover() 函数也很关键。机器人系统不能像普通程序一样出错后直接退出,因为退出时机器人可能停留在半下蹲、半抬手、前倾等不稳定姿态。异常恢复模块会先发送停止命令,再执行 0 号站立动作,让机器人尽量回到安全状态。


十一、云端轮询与状态机集成

原来的 Duo S 端云端客户端主要负责三件事:

  1. 轮询 /poll/{client_id}
  2. 执行动作序列;
  3. 调用 /ack/{client_id} 回传确认。

现在加入状态机后,主程序的结构更加清晰:

POLLING:轮询云端
RECEIVED:收到任务
VALIDATING/RUNNING:交给 ActionScheduler
ACKING:回传执行结果
IDLE:等待下一轮

十二、使用 AI 辅助生成主程序集成代码

提示词 6:生成状态机主程序

请帮我生成一个 Python 主程序 main.py,用于 Milk-V Duo S 端连接云端 OpenClaw Webhook 服务并执行机器人动作。

项目背景:
云端服务提供:
GET  /poll/{client_id}
POST /ack/{client_id}

Milk-V Duo S 端需要周期性轮询云端,如果收到 action_sequence,则交给 ActionScheduler 执行,执行完成后回传 ACK。

要求:
1. 导入 requests、time。
2. 导入 RobotState、RobotController、ActionScheduler。
3. 配置 SERVER、CLIENT_ID、POLL_INTERVAL。
4. 定义 poll_action():
   - GET 请求云端 poll 接口
   - timeout=5
   - 异常时返回 None
5. 定义 ack_action(command_id, execute_result):
   - POST 请求云端 ack 接口
   - 回传 command_id、status、reason
6. main() 函数中:
   - 初始化 RobotController
   - 初始化 ActionScheduler
   - 使用 last_command_id 防止重复执行
   - 状态切换包括 POLLING、RECEIVED、ACKING、IDLE
   - 收到 action_sequence 后执行 scheduler.run_sequence(sequence)
   - 执行后回传 ACK
7. KeyboardInterrupt 时停止机器人并关闭串口。
8. 代码要适合在 Milk-V Duo S 的 Python 环境中运行。

AI 生成代码:main.py

import time
import requests

from robot_state import RobotState
from robot_controller import RobotController
from action_scheduler import ActionScheduler


SERVER = "http://47.93.27.196:8765"
CLIENT_ID = "milk_duos_001"
POLL_INTERVAL = 2


def poll_action():
    """
    从云端轮询动作任务。
    """
    url = f"{SERVER}/poll/{CLIENT_ID}"

    try:
        response = requests.get(url, timeout=5)
        return response.json()
    except Exception as e:
        print(f"[Cloud] 轮询失败: {e}")
        return None


def ack_action(command_id, execute_result):
    """
    向云端回传动作执行结果。
    """
    url = f"{SERVER}/ack/{CLIENT_ID}"

    status = "done" if execute_result.get("ok") else "failed"

    data = {
        "command_id": command_id,
        "client_id": CLIENT_ID,
        "status": status,
        "reason": execute_result.get("reason")
    }

    try:
        requests.post(url, json=data, timeout=5)
        print(f"[Cloud] ACK 已回传: {data}")
        return True
    except Exception as e:
        print(f"[Cloud] ACK 回传失败: {e}")
        return False


def main():
    """
    Milk-V Duo S 端主循环。
    """
    print("[Main] Duo S 端机器人客户端启动")

    robot = RobotController()
    scheduler = ActionScheduler(robot)

    last_command_id = None

    try:
        while True:
            scheduler.set_state(RobotState.POLLING)

            result = poll_action()

            if result is None:
                scheduler.set_state(RobotState.IDLE)
                time.sleep(POLL_INTERVAL)
                continue

            if result.get("status") == "no_action":
                scheduler.set_state(RobotState.IDLE)
                time.sleep(POLL_INTERVAL)
                continue

            if result.get("action_sequence"):
                scheduler.set_state(RobotState.RECEIVED)

                command_id = result.get("command_id")
                sequence = result.get("action_sequence")

                if command_id == last_command_id:
                    print(f"[Main] 重复 command_id,跳过执行: {command_id}")
                    scheduler.set_state(RobotState.IDLE)
                    time.sleep(POLL_INTERVAL)
                    continue

                print(f"[Main] 收到云端动作任务: command_id={command_id}, sequence={sequence}")

                execute_result = scheduler.run_sequence(sequence)

                scheduler.set_state(RobotState.ACKING)
                ack_action(command_id, execute_result)

                last_command_id = command_id
                scheduler.set_state(RobotState.IDLE)

            else:
                print(f"[Main] 未知云端响应: {result}")
                scheduler.set_state(RobotState.IDLE)

            time.sleep(POLL_INTERVAL)

    except KeyboardInterrupt:
        print("[Main] 手动停止程序")
        robot.stop()
        robot.close()


if __name__ == "__main__":
    main()

技术理解

这里保留了 last_command_id 防重复执行机制。因为 HTTP 轮询模式下,可能出现一种情况:Duo S 已经执行了动作,但 ACK 回传失败,云端下一次仍然返回同一个 command_id。如果没有防重复机制,机器人会重复执行同一个动作。

因此 last_command_id 是一个很重要的端侧保护:

if command_id == last_command_id:
    print("重复 command_id,跳过执行")

状态机主程序的好处是,每一个阶段都能从日志中看出来:

IDLE -> POLLING
POLLING -> RECEIVED
RECEIVED -> VALIDATING
VALIDATING -> RUNNING
RUNNING -> ACKING
ACKING -> IDLE

这样调试时如果机器人没有动,我可以很快判断问题出在哪里:

  • 停在 POLLING:可能是网络问题;
  • 停在 RECEIVED:可能是任务格式问题;
  • 停在 VALIDATING:可能是动作编号不合法;
  • 停在 RUNNING:可能是串口或动作执行问题;
  • 停在 ACKING:可能是云端确认接口问题。

十三、完整文件结构设计

经过本次重构后,Milk-V Duo S 端代码结构可以设计成:

milkv-Duo-S/
├── main.py                 # 主程序入口,负责云端轮询和整体状态切换
├── robot_state.py          # 机器人端状态枚举
├── action_meta.py          # 动作元数据和风险分级
├── action_validator.py     # 动作序列安全校验和规范化
├── robot_controller.py     # 串口控制封装
├── action_scheduler.py     # 动作调度状态机
├── sequence_control.py     # 旧版动作序列执行模块
├── test_servo.py           # 本地串口测试
└── README.md               # 端侧运行说明

技术理解:

这种结构比之前所有逻辑都放在一个文件中更清晰。每个文件只负责一个功能:

  • robot_state.py 只负责状态定义;
  • action_meta.py 只负责动作信息;
  • action_validator.py 只负责安全校验;
  • robot_controller.py 只负责串口发送;
  • action_scheduler.py 只负责动作调度;
  • main.py 只负责端云通信主循环。

这体现了软件工程中的模块化思想。模块化以后,后期如果要增加新的复杂动作组,只需要修改 action_meta.py;如果要调整安全规则,只需要修改 action_validator.py;如果要更换通信方式,也不会影响底层动作执行逻辑。


十四、端侧安全机制总结

本次设计后,Duo S 端具备了多层安全机制:

安全机制 作用
动作白名单 防止未知动作编号执行
序列长度限制 防止动作过长导致机器人连续运行
高风险动作数量限制 防止连续复杂动作造成失稳
自动前置站立 保证复杂动作从标准姿态开始
自动后置站立 保证动作结束后安全回正
串口互斥锁 防止多线程同时写串口
running 标志位 防止动作序列并发执行
last_command_id 防止云端重复任务被重复执行
recover 异常恢复 出错后尽量停止并站立
状态机日志 方便定位问题发生在哪个阶段

十五、本周总结

本周我将原本较简单的 Duo S 端动作执行逻辑升级成了“动作调度状态机 + 动作安全校验与风险分级机制”。

原来的程序重点是“收到动作编号并执行”,而现在的程序重点变成了:

收到动作编号
  ↓
判断是否合法
  ↓
判断风险等级
  ↓
必要时自动补站立动作
  ↓
进入状态机调度
  ↓
串口互斥执行
  ↓
异常时安全回正
  ↓
执行结果回传云端

通过这次设计,我对 Milk-V Duo S 端在整个项目中的作用有了更深理解:

云端 OpenClaw 负责理解用户情绪和生成动作序列,但是 Duo S 端不能只是被动执行。因为真正连接机器人硬件的是 Duo S,真正让舵机运动的也是 Duo S。因此端侧必须承担最后一道安全防线,包括动作校验、风险控制、调度管理、串口互斥和异常恢复。

这次重构让机器人端从“能跑”进一步升级为“可靠地跑、安全地跑、可调试地跑”。这也是情绪互动机器人从一个演示 Demo 走向更完整系统的重要一步。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐