一、引言:为什么需要行为规划?

当前的大语言模型(LLM)Agent 大多采用 ReAct(Reasoning + Acting) 范式:接收到用户问题后,模型输出思考链条,然后调用工具,观察结果,再继续推理。这种"一个回合一个动作"的方式在简单任务上表现良好,但面对复杂、多步骤、需要长期规划的请求时,就会暴露出明显的缺陷:

  • 缺乏全局视野:只见当前这一步,不知道整体的执行路线
  • 容易陷入死循环:在某个子任务上反复尝试相同操作
  • 无法应对动态变化:一旦工具返回意外结果,不知道如何调整计划
  • 上下文窗口浪费:每次思考都带着全部历史,token 消耗巨大

Plan-and-Execute(计划与执行分离)架构正是为了解决这些问题而生的。它的核心思想很简单:把"想"和"做"分开——规划模块负责制定全局方案,执行模块负责按计划一步步落实,监控模块则持续观察执行状态,在必要时触发重规划。

本文将从零实现一个完整的 Plan-and-Execute 智能体行为规划系统,包含:
1. 任务分解与依赖图构建
2. 子任务调度器
3. Plan-then-Execute 与 Plan-while-Execute 两种执行模式
4. 执行监控与动态重规划
5. 完整可运行的 Python 代码

二、整体架构设计

我们先来看系统的整体架构:

用户请求
    │
    ▼
┌─────────────────┐
│   Task Planner   │  ← 任务分解器:分析请求 → 生成子任务列表 + 依赖关系
│  (LLM + Parser)  │
└────────┬────────┘
         │  Plan: [子任务1 → 子任务2 → 子任务3 ...]
         ▼
┌─────────────────┐
│  Task Scheduler  │  ← 调度器:根据依赖图确定执行顺序
│   (DAG 拓扑排序)  │
└────────┬────────┘
         ▼
┌─────────────────────┐
│   Executor Engine    │  ← 执行引擎:按顺序/并行执行子任务
│  (LLM per subtask)   │
└────────┬────────────┘
         │  Result per subtask
         ▼
┌─────────────────┐
│  Monitor &       │  ← 监控器:检查执行结果,决策是否重计划
│   Re-planner     │
└────────┬────────┘
         │  ✅ Done or 🔄 Re-plan
         ▼
      Final Response

这样的分层设计带来了几个关键好处:

  1. 模块化:每一层职责清晰,可以独立替换或升级
  2. 可观测性:可以记录和可视化完整的"计划 → 执行 → 结果"链条
  3. 健壮性:子任务失败不会导致整个计划崩溃,可以局部重试或重规划
  4. 效率:规划阶段只需一次 LLM 调用,执行阶段每个子任务一个轻量级 LLM 调用

三、核心数据结构

首先定义我们的核心数据模型,让代码清晰可读:

from dataclasses import dataclass, field
from typing import Optional
import enum

class SubTaskStatus(enum.Enum):
    PENDING = "pending"          # 等待执行
    IN_PROGRESS = "in_progress"  # 正在执行
    COMPLETED = "completed"      # 执行成功
    FAILED = "failed"            # 执行失败
    SKIPPED = "skipped"          # 跳过(依赖失败)

@dataclass
class SubTask:
    """单个子任务"""
    id: str                      # 唯一标识,如 "task_1"
    description: str             # 任务描述
    depends_on: list[str] = field(default_factory=list)  # 依赖的子任务ID列表
    status: SubTaskStatus = SubTaskStatus.PENDING
    result: Optional[str] = None # 执行结果
    error: Optional[str] = None  # 错误信息

@dataclass
class Plan:
    """完整执行计划"""
    goal: str                    # 原始用户目标
    subtasks: list[SubTask] = field(default_factory=list)
    current_index: int = 0       # 当前执行到第几个子任务
    created_at: float = 0.0
    updated_at: float = 0.0

四、任务分解器(Task Planner)

任务分解器是系统的"大脑"。它接收用户的自然语言请求,利用 LLM 将其拆解为一组有依赖关系的子任务。

4.1 Prompt 设计

分解器的 Prompt 非常关键。我们需要引导 LLM 输出结构化的 JSON 格式:

PLANNER_PROMPT = """你是一位专业的任务规划专家。请将用户的请求分解为一系列有依赖关系的子任务。

要求:
1. 每个子任务必须是一个可独立执行的原子操作
2. 明确标注子任务之间的依赖关系(如果任务B需要任务A的结果才能执行,则B依赖A)
3. 子任务数量在 3-8 个之间
4. 使用 JSON 格式输出

输出格式:
```json
{
  "subtasks": [
    {
      "id": "task_1",
      "description": "子任务描述",
      "depends_on": []
    },
    {
      "id": "task_2",
      "description": "子任务描述",
      "depends_on": ["task_1"]
    }
  ]
}

用户的请求:{goal}
"""

### 4.2 Planner 实现

```python
import json
import re
from typing import Optional

class TaskPlanner:
    """任务分解器"""

    def __init__(self, llm_callable):
        self.llm = llm_callable  # 任意 LLM 调用函数

    def parse_plan(self, llm_response: str) -> dict:
        """从 LLM 返回中提取 JSON plan"""
        # 尝试直接解析
        try:
            return json.loads(llm_response)
        except json.JSONDecodeError:
            pass

        # 尝试从 markdown 代码块中提取
        match = re.search(r'```(?:json)?\s*([\s\S]*?)```', llm_response)
        if match:
            try:
                return json.loads(match.group(1).strip())
            except json.JSONDecodeError:
                pass

        # 尝试从花括号提取
        match = re.search(r'\{[\s\S]*\}', llm_response)
        if match:
            try:
                return json.loads(match.group(0))
            except json.JSONDecodeError:
                pass

        raise ValueError(f"无法从 LLM 响应中解析 plan: {llm_response[:200]}")

    def create_plan(self, goal: str) -> Plan:
        """根据用户目标创建执行计划"""
        prompt = PLANNER_PROMPT.format(goal=goal)
        response = self.llm(prompt)
        plan_data = self.parse_plan(response)

        subtasks = []
        for item in plan_data["subtasks"]:
            subtasks.append(SubTask(
                id=item["id"],
                description=item["description"],
                depends_on=item.get("depends_on", [])
            ))

        import time
        return Plan(
            goal=goal,
            subtasks=subtasks,
            created_at=time.time(),
            updated_at=time.time()
        )

4.3 解析鲁棒性

上面的 parse_plan 方法做了三层兜底:
1. 直接 JSON 解析
2. 从 markdown code block 提取
3. 从第一个花括号对提取

这是因为 LLM 输出格式经常不稳定,这种分级解析策略可以大幅提高成功率。

五、任务调度器(Task Scheduler)

拿到子任务列表后,我们需要排定执行顺序。由于子任务之间存在依赖关系,这是一个典型的有向无环图(DAG)拓扑排序问题。

from collections import deque, defaultdict

class TaskScheduler:
    """基于 DAG 拓扑排序的任务调度器"""

    def schedule(self, plan: Plan) -> list[list[SubTask]]:
        """
        返回按执行阶段分组的子任务列表。
        每一组内的子任务可以并行执行。
        """
        # 构建入度表和邻接表
        in_degree = {task.id: 0 for task in plan.subtasks}
        graph = defaultdict(list)

        for task in plan.subtasks:
            for dep in task.depends_on:
                graph[dep].append(task.id)
                in_degree[task.id] = in_degree.get(task.id, 0) + 1

        # 查找不在依赖列表中的 ID(孤立节点)
        all_ids = {task.id for task in plan.subtasks}
        for dep_list in graph.values():
            for dep in dep_list:
                if dep not in all_ids:
                    raise ValueError(f"子任务 {dep} 依赖了不存在的任务")

        # Kahn 算法实现拓扑排序
        queue = deque([tid for tid, deg in in_degree.items() if deg == 0])
        stages = []

        while queue:
            stage = []
            for _ in range(len(queue)):
                tid = queue.popleft()
                task = next(t for t in plan.subtasks if t.id == tid)
                stage.append(task)

                for neighbor in graph[tid]:
                    in_degree[neighbor] -= 1
                    if in_degree[neighbor] == 0:
                        queue.append(neighbor)

            stages.append(stage)

        # 检查是否有环
        scheduled_count = sum(len(s) for s in stages)
        if scheduled_count != len(plan.subtasks):
            raise ValueError("任务依赖关系存在循环依赖!")

        return stages

    def get_execution_order(self, plan: Plan) -> list[str]:
        """获取平铺的执行顺序(用于日志和展示)"""
        stages = self.schedule(plan)
        order = []
        for i, stage in enumerate(stages):
            for task in stage:
                order.append(task.id)
        return order

    def print_plan(self, plan: Plan):
        """打印可读的执行计划"""
        stages = self.schedule(plan)
        print(f"📋 执行计划:{plan.goal}")
        print(f"  共 {len(plan.subtasks)} 个子任务,{len(stages)} 个执行阶段\n")

        for i, stage in enumerate(stages):
            parallel = " ⚡ 并行" if len(stage) > 1 else ""
            print(f"  阶段 {i+1}{parallel}:")
            for task in stage:
                deps = f" (依赖: {', '.join(task.depends_on)})" if task.depends_on else ""
                print(f"    ├─ {task.id}: {task.description}{deps}")
            print()

调度器的关键输出是执行阶段——同一阶段内的子任务彼此独立,可以并行执行。这对于有多个可用 Agent 的场景尤其有价值。

六、执行引擎(Executor)

执行引擎负责实际执行每个子任务。我设计了两种模式:

6.1 Plan-then-Execute(先计划后执行)

这是最基本的模式:先完整规划,然后按顺序依次执行。

class Executor:
    def __init__(self, llm_callable, tools: dict = None):
        self.llm = llm_callable
        self.tools = tools or {}  # 可用的工具集

    def execute_subtask(self, task: SubTask, context: dict) -> str:
        """执行单个子任务"""
        prompt = f"""你正在执行一个子任务。

子任务描述:{task.description}

已经完成的工作:
{self._format_context(context)}

请完成这个子任务,并输出结果。"""

        result = self.llm(prompt)
        return result

    def _format_context(self, context: dict) -> str:
        """格式化上下文供 LLM 使用"""
        if not context:
            return "(暂无)"

        lines = []
        for task_id, result in context.items():
            lines.append(f"[{task_id}]: {result[:200]}")

        return "\n".join(lines)

6.2 Plan-while-Execute(边计划边执行)

对于复杂任务,我更推荐这种模式:先制定粗略计划,每完成一个阶段后结合新信息微调后续计划。

class AdaptiveExecutor(Executor):
    """自适应执行器:每完成一批子任务后,可以调整后续计划"""

    def __init__(self, planner: TaskPlanner, llm_callable, tools: dict = None):
        super().__init__(llm_callable, tools)
        self.planner = planner

    def execute_and_monitor(
        self, plan: Plan, scheduler: TaskScheduler, max_replans: int = 3
    ) -> Plan:
        stages = scheduler.schedule(plan)
        context = {}
        replan_count = 0

        for stage_idx, stage in enumerate(stages):
            print(f"\n▶ 执行阶段 {stage_idx + 1}/{len(stages)}")

            for task in stage:
                print(f"  执行 {task.id}: {task.description[:50]}...")
                task.status = SubTaskStatus.IN_PROGRESS

                try:
                    result = self.execute_subtask(task, context)
                    task.status = SubTaskStatus.COMPLETED
                    task.result = result
                    context[task.id] = result
                    print(f"  ✅ {task.id} 完成")
                except Exception as e:
                    task.status = SubTaskStatus.FAILED
                    task.error = str(e)
                    print(f"  ❌ {task.id} 失败: {e}")

                    # 决定是否需要重规划
                    if replan_count < max_replans and self._should_replan(plan, task):
                        self._replan(plan, context, stage_idx)
                        replan_count += 1
                        # 重新计算执行阶段
                        stages = scheduler.schedule(plan)
                        break  # 重进 stage for 循环

            plan.updated_at = __import__('time').time()

        return plan

    def _should_replan(self, plan: Plan, failed_task: SubTask) -> bool:
        """判断是否需要重规划"""
        # 如果失败的子任务被后续任务依赖,需要重规划
        for task in plan.subtasks:
            if failed_task.id in task.depends_on:
                return True

        # 如果失败次数过多,触发重规划
        failed_count = sum(1 for t in plan.subtasks if t.status == SubTaskStatus.FAILED)
        return failed_count >= 2

    def _replan(self, plan: Plan, context: dict, current_stage: int):
        """对未执行的部分进行重规划"""
        # 找出未执行的任务
        remaining = [
            t for t in plan.subtasks
            if t.status == SubTaskStatus.PENDING
        ]

        remaining_desc = [t.description for t in remaining]
        context_summary = {k: v[:100] for k, v in context.items()}

        replan_prompt = f"""原始目标:{plan.goal}

已完成的任务及其结果:
{json.dumps(context_summary, ensure_ascii=False, indent=2)}

以下子任务尚未完成:
{json.dumps(remaining_desc, ensure_ascii=False, indent=2)}

由于前面某个子任务失败,请重新规划剩余的子任务。
考虑已完成的工作和失败原因,输出新的 JSON plan。"""

        response = self.llm(replan_prompt)
        new_plan_data = self.planner.parse_plan(response)

        # 用新计划替换未执行的任务
        new_subtasks = [
            t for t in plan.subtasks
            if t.status != SubTaskStatus.PENDING
        ]

        for item in new_plan_data["subtasks"]:
            new_subtasks.append(SubTask(
                id=f"{item.id}_replan",
                description=item["description"],
                depends_on=item.get("depends_on", [])
            ))

        plan.subtasks = new_subtasks
        print(f"  🔄 已重规划,剩余 {len(plan.subtasks) - sum(1 for t in plan.subtasks if t.status in [SubTaskStatus.COMPLETED, SubTaskStatus.FAILED])} 个子任务")

七、监控与重规划引擎(Monitor)

监控器是保持系统鲁棒性的关键组件。它负责:

  1. 检查子任务结果是否与预期一致
  2. 检测是否偏离了原始目标
  3. 在必要时触发重规划或向用户请求澄清
class Monitor:
    def __init__(self, llm_callable):
        self.llm = llm_callable

    def evaluate_progress(self, plan: Plan) -> dict:
        """评估当前执行进度和健康状态"""
        total = len(plan.subtasks)
        completed = sum(1 for t in plan.subtasks if t.status == SubTaskStatus.COMPLETED)
        failed = sum(1 for t in plan.subtasks if t.status == SubTaskStatus.FAILED)

        progress_pct = (completed / total * 100) if total > 0 else 0

        # 用 LLM 检查整体进度是否合理
        check_prompt = f"""请评估以下 AI Agent 的执行进度:

目标:{plan.goal}

子任务状态:
"""
        for task in plan.subtasks:
            status_icon = {
                SubTaskStatus.COMPLETED: "✅",
                SubTaskStatus.FAILED: "❌",
                SubTaskStatus.IN_PROGRESS: "🔄",
                SubTaskStatus.PENDING: "⏳"
            }.get(task.status, "❓")
            check_prompt += f"{status_icon} {task.id}: {task.description}\n"
            if task.result:
                check_prompt += f"   结果: {task.result[:150]}...\n"

        check_prompt += """
请判断:
1. 当前进展是否正常?
2. 是否需要调整策略?
3. 是否应该继续执行还是停下来修改计划?

输出 JSON:
{"status": "on_track|needs_adjustment|off_track", "reason": "xxx", "suggestion": "xxx"}
"""
        response = self.llm(check_prompt)

        try:
            evaluation = json.loads(
                re.search(r'\{[\s\S]*\}', response).group(0)
            )
        except (json.JSONDecodeError, AttributeError):
            evaluation = {"status": "on_track", "reason": "解析失败,默认继续"}

        return {
            "total": total,
            "completed": completed,
            "failed": failed,
            "progress_pct": progress_pct,
            "evaluation": evaluation
        }

八、组装完整系统

现在把所有组件组装成一个完整的 Plan-and-Execute 引擎:

import json
import time

class PlanAndExecuteAgent:
    """完整的 Plan-and-Execute 智能体"""

    def __init__(self, llm_callable, tools: dict = None):
        self.planner = TaskPlanner(llm_callable)
        self.scheduler = TaskScheduler()
        self.executor = AdaptiveExecutor(self.planner, llm_callable, tools)
        self.monitor = Monitor(llm_callable)
        self.history = []  # 执行历史

    def run(self, goal: str, adaptive: bool = True) -> str:
        """执行一个完整的 Plan-and-Execute 流程"""

        # 阶段 1:规划
        print("=" * 50)
        print("🔍 阶段 1: 任务分解")
        plan = self.planner.create_plan(goal)
        print(f"  目标: {plan.goal}")
        print(f"  拆解为 {len(plan.subtasks)} 个子任务")

        # 打印执行计划
        self.scheduler.print_plan(plan)

        # 阶段 2:执行
        print("=" * 50)
        print("🚀 阶段 2: 执行")

        if adaptive:
            plan = self.executor.execute_and_monitor(plan, self.scheduler)
        else:
            stages = self.scheduler.schedule(plan)
            for stage in stages:
                for task in stage:
                    task.status = SubTaskStatus.IN_PROGRESS
                    context = {t.id: t.result for t in plan.subtasks if t.result}
                    result = self.executor.execute_subtask(task, context)
                    task.status = SubTaskStatus.COMPLETED
                    task.result = result
                    print(f"  ✅ {task.id}: {task.description[:40]}...")

        # 阶段 3:监控与评估
        print("=" * 50)
        print("📊 阶段 3: 执行评估")
        evaluation = self.monitor.evaluate_progress(plan)

        report = {
            "status": evaluation["evaluation"].get("status", "unknown"),
            "reason": evaluation["evaluation"].get("reason", ""),
            "progress": f"{evaluation['completed']}/{evaluation['total']}",
            "results": {}
        }

        for task in plan.subtasks:
            report["results"][task.id] = {
                "description": task.description,
                "status": task.status.value,
                "result": task.result[:200] if task.result else None
            }

        self.history.append(report)

        # 生成最终总结
        summary = self._generate_summary(plan, evaluation)
        return summary

    def _generate_summary(self, plan: Plan, evaluation: dict) -> str:
        """生成执行总结"""
        completed = evaluation["completed"]
        total = evaluation["total"]

        lines = [f"## 📋 执行报告:{plan.goal}", ""]
        lines.append(f"**状态**: {evaluation['evaluation'].get('status', 'unknown')}")
        lines.append(f"**进度**: {completed}/{total} 子任务完成")
        lines.append(f"**评估**: {evaluation['evaluation'].get('reason', '无')}")
        lines.append("")

        if evaluation['evaluation'].get('suggestion'):
            lines.append(f"**建议**: {evaluation['evaluation']['suggestion']}")
            lines.append("")

        lines.append("### 子任务详情")
        for task in plan.subtasks:
            status_icon = {
                SubTaskStatus.COMPLETED: "✅",
                SubTaskStatus.FAILED: "❌",
                SubTaskStatus.IN_PROGRESS: "🔄",
                SubTaskStatus.PENDING: "⏳",
                SubTaskStatus.SKIPPED: "⏭️"
            }.get(task.status, "❓")
            lines.append(f"- {status_icon} **{task.id}**: {task.description}")
            if task.result:
                lines.append(f"  - 结果: {task.result[:100]}")

        return "\n".join(lines)

九、完整测试示例

下面用一个"收集和分析技术博客数据"的例子来验证系统:

def mock_llm(prompt: str) -> str:
    """模拟 LLM 响应(用于测试)"""
    if "规划专家" in prompt or "分解" in prompt:
        return """{
  "subtasks": [
    {
      "id": "task_1",
      "description": "搜索互联网上最新的 AI Agent 技术博客",
      "depends_on": []
    },
    {
      "id": "task_2",
      "description": "提取并总结每篇博客的核心观点和技术要点",
      "depends_on": ["task_1"]
    },
    {
      "id": "task_3",
      "description": "对比分析各篇博客的异同点",
      "depends_on": ["task_2"]
    },
    {
      "id": "task_4",
      "description": "生成一份结构化的分析报告",
      "depends_on": ["task_3"]
    }
  ]
}"""

    if "子任务" in prompt and "执行" in prompt:
        task_desc = [line for line in prompt.split('\n') if '子任务描述' in line]
        if task_desc:
            desc = task_desc[0].split(':')[-1].strip()
        else:
            desc = "执行子任务"
        return f"已完成: {desc[:30]}...\n结果: 成功获取并处理了相关数据。"

    if "评估" in prompt and "进度" in prompt:
        return '{"status": "on_track", "reason": "所有子任务按计划推进", "suggestion": "继续执行"}'

    return "已完成任务。"

# 测试
agent = PlanAndExecuteAgent(mock_llm)
result = agent.run("收集并分析近期 AI Agent 领域的技术博客")
print(result)

# 查看执行历史
print(f"\n执行历史记录: {len(agent.history)} 次")
print(json.dumps(agent.history[-1]["results"], ensure_ascii=False, indent=2))

十、关键设计模式总结

10.1 三种执行策略对比

策略 适用场景 优点 缺点
Plan-then-Execute 任务明确、步骤清晰 简单可控 无法应对变化
Plan-while-Execute 探索式任务 灵活性高 实现复杂
Hierarchical Planning 超长任务 结构化好 递归开销

10.2 重规划触发条件

在实践中,以下几种情况应触发重规划:

  1. 依赖失败的子任务:如果任务 B 依赖任务 A,而 A 失败,B 必须重新规划
  2. 连续失败:同一阶段连续 2+ 个子任务失败
  3. 结果偏离预期:LLM 评估发现执行方向偏离了原始目标
  4. 超时:某个子任务执行时间超过阈值

10.3 工程最佳实践

  1. Plan 持久化:将 Plan 序列化为 JSON 保存,方便断点续传
  2. 子任务上下文窗口控制:每个子任务只传递相关的前序结果,而非全部历史
  3. 并行执行安全:并行执行子任务时,确保工具调用是线程安全的
  4. 幂等性:每个子任务设计为可重入,失败后可以安全重试

十一、扩展方向

这个基础框架还可以向几个方向扩展:

  • 层次化规划(Hierarchical Planning):子任务还可以再分解为孙任务,形成树形规划结构
  • 工具调用集成:在 execute_subtask 中引入 Function Calling/Tool Use,让子任务可以调用外部 API
  • 人类反馈循环:在关键决策点暂停,向人类请求确认或澄清
  • 记忆系统集成:将执行结果持久化到记忆系统,供未来的计划参考

结语

Plan-and-Execute 架构将"思考"和"行动"分离,让 AI 智能体从"一步一看"的短视模式,升级为"全局规划+分步执行+动态调整"的成熟模式。本文实现了完整的行为规划系统,包含任务分解、DAG 调度、自适应执行和监控重规划四个核心模块,代码可以直接集成到你的 Agent 项目中使用。


📚 延伸阅读

如果你对 AI Agent 的实战用法感兴趣,推荐阅读我的另一篇文章:

👉 DeepSeek 实战指南:提示词工程、API 集成与效率提升全攻略

这篇文章系统地拆解了 DeepSeek 的提示词工程技巧、API 封装方法以及日常效率提升场景,全文代码可直接运行,适合已经上手 DeepSeek 但希望更高效使用的开发者。


本文是"手写 AI 系统"系列文章之一。该系列从零实现 AI 系统中的关键组件,涵盖 RAG、Agent、Function Calling、MCP 等核心技术,帮助你深入理解底层原理,构建属于自己的 AI 工具。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐