手写 Plan-and-Execute:从零实现 AI 智能体行为规划系统
一、引言:为什么需要行为规划?
当前的大语言模型(LLM)Agent 大多采用 ReAct(Reasoning + Acting) 范式:接收到用户问题后,模型输出思考链条,然后调用工具,观察结果,再继续推理。这种"一个回合一个动作"的方式在简单任务上表现良好,但面对复杂、多步骤、需要长期规划的请求时,就会暴露出明显的缺陷:
- 缺乏全局视野:只见当前这一步,不知道整体的执行路线
- 容易陷入死循环:在某个子任务上反复尝试相同操作
- 无法应对动态变化:一旦工具返回意外结果,不知道如何调整计划
- 上下文窗口浪费:每次思考都带着全部历史,token 消耗巨大
Plan-and-Execute(计划与执行分离)架构正是为了解决这些问题而生的。它的核心思想很简单:把"想"和"做"分开——规划模块负责制定全局方案,执行模块负责按计划一步步落实,监控模块则持续观察执行状态,在必要时触发重规划。
本文将从零实现一个完整的 Plan-and-Execute 智能体行为规划系统,包含:
1. 任务分解与依赖图构建
2. 子任务调度器
3. Plan-then-Execute 与 Plan-while-Execute 两种执行模式
4. 执行监控与动态重规划
5. 完整可运行的 Python 代码
二、整体架构设计
我们先来看系统的整体架构:
用户请求
│
▼
┌─────────────────┐
│ Task Planner │ ← 任务分解器:分析请求 → 生成子任务列表 + 依赖关系
│ (LLM + Parser) │
└────────┬────────┘
│ Plan: [子任务1 → 子任务2 → 子任务3 ...]
▼
┌─────────────────┐
│ Task Scheduler │ ← 调度器:根据依赖图确定执行顺序
│ (DAG 拓扑排序) │
└────────┬────────┘
▼
┌─────────────────────┐
│ Executor Engine │ ← 执行引擎:按顺序/并行执行子任务
│ (LLM per subtask) │
└────────┬────────────┘
│ Result per subtask
▼
┌─────────────────┐
│ Monitor & │ ← 监控器:检查执行结果,决策是否重计划
│ Re-planner │
└────────┬────────┘
│ ✅ Done or 🔄 Re-plan
▼
Final Response
这样的分层设计带来了几个关键好处:
- 模块化:每一层职责清晰,可以独立替换或升级
- 可观测性:可以记录和可视化完整的"计划 → 执行 → 结果"链条
- 健壮性:子任务失败不会导致整个计划崩溃,可以局部重试或重规划
- 效率:规划阶段只需一次 LLM 调用,执行阶段每个子任务一个轻量级 LLM 调用
三、核心数据结构
首先定义我们的核心数据模型,让代码清晰可读:
from dataclasses import dataclass, field
from typing import Optional
import enum
class SubTaskStatus(enum.Enum):
PENDING = "pending" # 等待执行
IN_PROGRESS = "in_progress" # 正在执行
COMPLETED = "completed" # 执行成功
FAILED = "failed" # 执行失败
SKIPPED = "skipped" # 跳过(依赖失败)
@dataclass
class SubTask:
"""单个子任务"""
id: str # 唯一标识,如 "task_1"
description: str # 任务描述
depends_on: list[str] = field(default_factory=list) # 依赖的子任务ID列表
status: SubTaskStatus = SubTaskStatus.PENDING
result: Optional[str] = None # 执行结果
error: Optional[str] = None # 错误信息
@dataclass
class Plan:
"""完整执行计划"""
goal: str # 原始用户目标
subtasks: list[SubTask] = field(default_factory=list)
current_index: int = 0 # 当前执行到第几个子任务
created_at: float = 0.0
updated_at: float = 0.0
四、任务分解器(Task Planner)
任务分解器是系统的"大脑"。它接收用户的自然语言请求,利用 LLM 将其拆解为一组有依赖关系的子任务。
4.1 Prompt 设计
分解器的 Prompt 非常关键。我们需要引导 LLM 输出结构化的 JSON 格式:
PLANNER_PROMPT = """你是一位专业的任务规划专家。请将用户的请求分解为一系列有依赖关系的子任务。
要求:
1. 每个子任务必须是一个可独立执行的原子操作
2. 明确标注子任务之间的依赖关系(如果任务B需要任务A的结果才能执行,则B依赖A)
3. 子任务数量在 3-8 个之间
4. 使用 JSON 格式输出
输出格式:
```json
{
"subtasks": [
{
"id": "task_1",
"description": "子任务描述",
"depends_on": []
},
{
"id": "task_2",
"description": "子任务描述",
"depends_on": ["task_1"]
}
]
}
用户的请求:{goal}
"""
### 4.2 Planner 实现
```python
import json
import re
from typing import Optional
class TaskPlanner:
"""任务分解器"""
def __init__(self, llm_callable):
self.llm = llm_callable # 任意 LLM 调用函数
def parse_plan(self, llm_response: str) -> dict:
"""从 LLM 返回中提取 JSON plan"""
# 尝试直接解析
try:
return json.loads(llm_response)
except json.JSONDecodeError:
pass
# 尝试从 markdown 代码块中提取
match = re.search(r'```(?:json)?\s*([\s\S]*?)```', llm_response)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# 尝试从花括号提取
match = re.search(r'\{[\s\S]*\}', llm_response)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
raise ValueError(f"无法从 LLM 响应中解析 plan: {llm_response[:200]}")
def create_plan(self, goal: str) -> Plan:
"""根据用户目标创建执行计划"""
prompt = PLANNER_PROMPT.format(goal=goal)
response = self.llm(prompt)
plan_data = self.parse_plan(response)
subtasks = []
for item in plan_data["subtasks"]:
subtasks.append(SubTask(
id=item["id"],
description=item["description"],
depends_on=item.get("depends_on", [])
))
import time
return Plan(
goal=goal,
subtasks=subtasks,
created_at=time.time(),
updated_at=time.time()
)
4.3 解析鲁棒性
上面的 parse_plan 方法做了三层兜底:
1. 直接 JSON 解析
2. 从 markdown code block 提取
3. 从第一个花括号对提取
这是因为 LLM 输出格式经常不稳定,这种分级解析策略可以大幅提高成功率。
五、任务调度器(Task Scheduler)
拿到子任务列表后,我们需要排定执行顺序。由于子任务之间存在依赖关系,这是一个典型的有向无环图(DAG)拓扑排序问题。
from collections import deque, defaultdict
class TaskScheduler:
"""基于 DAG 拓扑排序的任务调度器"""
def schedule(self, plan: Plan) -> list[list[SubTask]]:
"""
返回按执行阶段分组的子任务列表。
每一组内的子任务可以并行执行。
"""
# 构建入度表和邻接表
in_degree = {task.id: 0 for task in plan.subtasks}
graph = defaultdict(list)
for task in plan.subtasks:
for dep in task.depends_on:
graph[dep].append(task.id)
in_degree[task.id] = in_degree.get(task.id, 0) + 1
# 查找不在依赖列表中的 ID(孤立节点)
all_ids = {task.id for task in plan.subtasks}
for dep_list in graph.values():
for dep in dep_list:
if dep not in all_ids:
raise ValueError(f"子任务 {dep} 依赖了不存在的任务")
# Kahn 算法实现拓扑排序
queue = deque([tid for tid, deg in in_degree.items() if deg == 0])
stages = []
while queue:
stage = []
for _ in range(len(queue)):
tid = queue.popleft()
task = next(t for t in plan.subtasks if t.id == tid)
stage.append(task)
for neighbor in graph[tid]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
stages.append(stage)
# 检查是否有环
scheduled_count = sum(len(s) for s in stages)
if scheduled_count != len(plan.subtasks):
raise ValueError("任务依赖关系存在循环依赖!")
return stages
def get_execution_order(self, plan: Plan) -> list[str]:
"""获取平铺的执行顺序(用于日志和展示)"""
stages = self.schedule(plan)
order = []
for i, stage in enumerate(stages):
for task in stage:
order.append(task.id)
return order
def print_plan(self, plan: Plan):
"""打印可读的执行计划"""
stages = self.schedule(plan)
print(f"📋 执行计划:{plan.goal}")
print(f" 共 {len(plan.subtasks)} 个子任务,{len(stages)} 个执行阶段\n")
for i, stage in enumerate(stages):
parallel = " ⚡ 并行" if len(stage) > 1 else ""
print(f" 阶段 {i+1}{parallel}:")
for task in stage:
deps = f" (依赖: {', '.join(task.depends_on)})" if task.depends_on else ""
print(f" ├─ {task.id}: {task.description}{deps}")
print()
调度器的关键输出是执行阶段——同一阶段内的子任务彼此独立,可以并行执行。这对于有多个可用 Agent 的场景尤其有价值。
六、执行引擎(Executor)
执行引擎负责实际执行每个子任务。我设计了两种模式:
6.1 Plan-then-Execute(先计划后执行)
这是最基本的模式:先完整规划,然后按顺序依次执行。
class Executor:
def __init__(self, llm_callable, tools: dict = None):
self.llm = llm_callable
self.tools = tools or {} # 可用的工具集
def execute_subtask(self, task: SubTask, context: dict) -> str:
"""执行单个子任务"""
prompt = f"""你正在执行一个子任务。
子任务描述:{task.description}
已经完成的工作:
{self._format_context(context)}
请完成这个子任务,并输出结果。"""
result = self.llm(prompt)
return result
def _format_context(self, context: dict) -> str:
"""格式化上下文供 LLM 使用"""
if not context:
return "(暂无)"
lines = []
for task_id, result in context.items():
lines.append(f"[{task_id}]: {result[:200]}")
return "\n".join(lines)
6.2 Plan-while-Execute(边计划边执行)
对于复杂任务,我更推荐这种模式:先制定粗略计划,每完成一个阶段后结合新信息微调后续计划。
class AdaptiveExecutor(Executor):
"""自适应执行器:每完成一批子任务后,可以调整后续计划"""
def __init__(self, planner: TaskPlanner, llm_callable, tools: dict = None):
super().__init__(llm_callable, tools)
self.planner = planner
def execute_and_monitor(
self, plan: Plan, scheduler: TaskScheduler, max_replans: int = 3
) -> Plan:
stages = scheduler.schedule(plan)
context = {}
replan_count = 0
for stage_idx, stage in enumerate(stages):
print(f"\n▶ 执行阶段 {stage_idx + 1}/{len(stages)}")
for task in stage:
print(f" 执行 {task.id}: {task.description[:50]}...")
task.status = SubTaskStatus.IN_PROGRESS
try:
result = self.execute_subtask(task, context)
task.status = SubTaskStatus.COMPLETED
task.result = result
context[task.id] = result
print(f" ✅ {task.id} 完成")
except Exception as e:
task.status = SubTaskStatus.FAILED
task.error = str(e)
print(f" ❌ {task.id} 失败: {e}")
# 决定是否需要重规划
if replan_count < max_replans and self._should_replan(plan, task):
self._replan(plan, context, stage_idx)
replan_count += 1
# 重新计算执行阶段
stages = scheduler.schedule(plan)
break # 重进 stage for 循环
plan.updated_at = __import__('time').time()
return plan
def _should_replan(self, plan: Plan, failed_task: SubTask) -> bool:
"""判断是否需要重规划"""
# 如果失败的子任务被后续任务依赖,需要重规划
for task in plan.subtasks:
if failed_task.id in task.depends_on:
return True
# 如果失败次数过多,触发重规划
failed_count = sum(1 for t in plan.subtasks if t.status == SubTaskStatus.FAILED)
return failed_count >= 2
def _replan(self, plan: Plan, context: dict, current_stage: int):
"""对未执行的部分进行重规划"""
# 找出未执行的任务
remaining = [
t for t in plan.subtasks
if t.status == SubTaskStatus.PENDING
]
remaining_desc = [t.description for t in remaining]
context_summary = {k: v[:100] for k, v in context.items()}
replan_prompt = f"""原始目标:{plan.goal}
已完成的任务及其结果:
{json.dumps(context_summary, ensure_ascii=False, indent=2)}
以下子任务尚未完成:
{json.dumps(remaining_desc, ensure_ascii=False, indent=2)}
由于前面某个子任务失败,请重新规划剩余的子任务。
考虑已完成的工作和失败原因,输出新的 JSON plan。"""
response = self.llm(replan_prompt)
new_plan_data = self.planner.parse_plan(response)
# 用新计划替换未执行的任务
new_subtasks = [
t for t in plan.subtasks
if t.status != SubTaskStatus.PENDING
]
for item in new_plan_data["subtasks"]:
new_subtasks.append(SubTask(
id=f"{item.id}_replan",
description=item["description"],
depends_on=item.get("depends_on", [])
))
plan.subtasks = new_subtasks
print(f" 🔄 已重规划,剩余 {len(plan.subtasks) - sum(1 for t in plan.subtasks if t.status in [SubTaskStatus.COMPLETED, SubTaskStatus.FAILED])} 个子任务")
七、监控与重规划引擎(Monitor)
监控器是保持系统鲁棒性的关键组件。它负责:
- 检查子任务结果是否与预期一致
- 检测是否偏离了原始目标
- 在必要时触发重规划或向用户请求澄清
class Monitor:
def __init__(self, llm_callable):
self.llm = llm_callable
def evaluate_progress(self, plan: Plan) -> dict:
"""评估当前执行进度和健康状态"""
total = len(plan.subtasks)
completed = sum(1 for t in plan.subtasks if t.status == SubTaskStatus.COMPLETED)
failed = sum(1 for t in plan.subtasks if t.status == SubTaskStatus.FAILED)
progress_pct = (completed / total * 100) if total > 0 else 0
# 用 LLM 检查整体进度是否合理
check_prompt = f"""请评估以下 AI Agent 的执行进度:
目标:{plan.goal}
子任务状态:
"""
for task in plan.subtasks:
status_icon = {
SubTaskStatus.COMPLETED: "✅",
SubTaskStatus.FAILED: "❌",
SubTaskStatus.IN_PROGRESS: "🔄",
SubTaskStatus.PENDING: "⏳"
}.get(task.status, "❓")
check_prompt += f"{status_icon} {task.id}: {task.description}\n"
if task.result:
check_prompt += f" 结果: {task.result[:150]}...\n"
check_prompt += """
请判断:
1. 当前进展是否正常?
2. 是否需要调整策略?
3. 是否应该继续执行还是停下来修改计划?
输出 JSON:
{"status": "on_track|needs_adjustment|off_track", "reason": "xxx", "suggestion": "xxx"}
"""
response = self.llm(check_prompt)
try:
evaluation = json.loads(
re.search(r'\{[\s\S]*\}', response).group(0)
)
except (json.JSONDecodeError, AttributeError):
evaluation = {"status": "on_track", "reason": "解析失败,默认继续"}
return {
"total": total,
"completed": completed,
"failed": failed,
"progress_pct": progress_pct,
"evaluation": evaluation
}
八、组装完整系统
现在把所有组件组装成一个完整的 Plan-and-Execute 引擎:
import json
import time
class PlanAndExecuteAgent:
"""完整的 Plan-and-Execute 智能体"""
def __init__(self, llm_callable, tools: dict = None):
self.planner = TaskPlanner(llm_callable)
self.scheduler = TaskScheduler()
self.executor = AdaptiveExecutor(self.planner, llm_callable, tools)
self.monitor = Monitor(llm_callable)
self.history = [] # 执行历史
def run(self, goal: str, adaptive: bool = True) -> str:
"""执行一个完整的 Plan-and-Execute 流程"""
# 阶段 1:规划
print("=" * 50)
print("🔍 阶段 1: 任务分解")
plan = self.planner.create_plan(goal)
print(f" 目标: {plan.goal}")
print(f" 拆解为 {len(plan.subtasks)} 个子任务")
# 打印执行计划
self.scheduler.print_plan(plan)
# 阶段 2:执行
print("=" * 50)
print("🚀 阶段 2: 执行")
if adaptive:
plan = self.executor.execute_and_monitor(plan, self.scheduler)
else:
stages = self.scheduler.schedule(plan)
for stage in stages:
for task in stage:
task.status = SubTaskStatus.IN_PROGRESS
context = {t.id: t.result for t in plan.subtasks if t.result}
result = self.executor.execute_subtask(task, context)
task.status = SubTaskStatus.COMPLETED
task.result = result
print(f" ✅ {task.id}: {task.description[:40]}...")
# 阶段 3:监控与评估
print("=" * 50)
print("📊 阶段 3: 执行评估")
evaluation = self.monitor.evaluate_progress(plan)
report = {
"status": evaluation["evaluation"].get("status", "unknown"),
"reason": evaluation["evaluation"].get("reason", ""),
"progress": f"{evaluation['completed']}/{evaluation['total']}",
"results": {}
}
for task in plan.subtasks:
report["results"][task.id] = {
"description": task.description,
"status": task.status.value,
"result": task.result[:200] if task.result else None
}
self.history.append(report)
# 生成最终总结
summary = self._generate_summary(plan, evaluation)
return summary
def _generate_summary(self, plan: Plan, evaluation: dict) -> str:
"""生成执行总结"""
completed = evaluation["completed"]
total = evaluation["total"]
lines = [f"## 📋 执行报告:{plan.goal}", ""]
lines.append(f"**状态**: {evaluation['evaluation'].get('status', 'unknown')}")
lines.append(f"**进度**: {completed}/{total} 子任务完成")
lines.append(f"**评估**: {evaluation['evaluation'].get('reason', '无')}")
lines.append("")
if evaluation['evaluation'].get('suggestion'):
lines.append(f"**建议**: {evaluation['evaluation']['suggestion']}")
lines.append("")
lines.append("### 子任务详情")
for task in plan.subtasks:
status_icon = {
SubTaskStatus.COMPLETED: "✅",
SubTaskStatus.FAILED: "❌",
SubTaskStatus.IN_PROGRESS: "🔄",
SubTaskStatus.PENDING: "⏳",
SubTaskStatus.SKIPPED: "⏭️"
}.get(task.status, "❓")
lines.append(f"- {status_icon} **{task.id}**: {task.description}")
if task.result:
lines.append(f" - 结果: {task.result[:100]}")
return "\n".join(lines)
九、完整测试示例
下面用一个"收集和分析技术博客数据"的例子来验证系统:
def mock_llm(prompt: str) -> str:
"""模拟 LLM 响应(用于测试)"""
if "规划专家" in prompt or "分解" in prompt:
return """{
"subtasks": [
{
"id": "task_1",
"description": "搜索互联网上最新的 AI Agent 技术博客",
"depends_on": []
},
{
"id": "task_2",
"description": "提取并总结每篇博客的核心观点和技术要点",
"depends_on": ["task_1"]
},
{
"id": "task_3",
"description": "对比分析各篇博客的异同点",
"depends_on": ["task_2"]
},
{
"id": "task_4",
"description": "生成一份结构化的分析报告",
"depends_on": ["task_3"]
}
]
}"""
if "子任务" in prompt and "执行" in prompt:
task_desc = [line for line in prompt.split('\n') if '子任务描述' in line]
if task_desc:
desc = task_desc[0].split(':')[-1].strip()
else:
desc = "执行子任务"
return f"已完成: {desc[:30]}...\n结果: 成功获取并处理了相关数据。"
if "评估" in prompt and "进度" in prompt:
return '{"status": "on_track", "reason": "所有子任务按计划推进", "suggestion": "继续执行"}'
return "已完成任务。"
# 测试
agent = PlanAndExecuteAgent(mock_llm)
result = agent.run("收集并分析近期 AI Agent 领域的技术博客")
print(result)
# 查看执行历史
print(f"\n执行历史记录: {len(agent.history)} 次")
print(json.dumps(agent.history[-1]["results"], ensure_ascii=False, indent=2))
十、关键设计模式总结
10.1 三种执行策略对比
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Plan-then-Execute | 任务明确、步骤清晰 | 简单可控 | 无法应对变化 |
| Plan-while-Execute | 探索式任务 | 灵活性高 | 实现复杂 |
| Hierarchical Planning | 超长任务 | 结构化好 | 递归开销 |
10.2 重规划触发条件
在实践中,以下几种情况应触发重规划:
- 依赖失败的子任务:如果任务 B 依赖任务 A,而 A 失败,B 必须重新规划
- 连续失败:同一阶段连续 2+ 个子任务失败
- 结果偏离预期:LLM 评估发现执行方向偏离了原始目标
- 超时:某个子任务执行时间超过阈值
10.3 工程最佳实践
- Plan 持久化:将 Plan 序列化为 JSON 保存,方便断点续传
- 子任务上下文窗口控制:每个子任务只传递相关的前序结果,而非全部历史
- 并行执行安全:并行执行子任务时,确保工具调用是线程安全的
- 幂等性:每个子任务设计为可重入,失败后可以安全重试
十一、扩展方向
这个基础框架还可以向几个方向扩展:
- 层次化规划(Hierarchical Planning):子任务还可以再分解为孙任务,形成树形规划结构
- 工具调用集成:在 execute_subtask 中引入 Function Calling/Tool Use,让子任务可以调用外部 API
- 人类反馈循环:在关键决策点暂停,向人类请求确认或澄清
- 记忆系统集成:将执行结果持久化到记忆系统,供未来的计划参考
结语
Plan-and-Execute 架构将"思考"和"行动"分离,让 AI 智能体从"一步一看"的短视模式,升级为"全局规划+分步执行+动态调整"的成熟模式。本文实现了完整的行为规划系统,包含任务分解、DAG 调度、自适应执行和监控重规划四个核心模块,代码可以直接集成到你的 Agent 项目中使用。
📚 延伸阅读
如果你对 AI Agent 的实战用法感兴趣,推荐阅读我的另一篇文章:
👉 DeepSeek 实战指南:提示词工程、API 集成与效率提升全攻略
这篇文章系统地拆解了 DeepSeek 的提示词工程技巧、API 封装方法以及日常效率提升场景,全文代码可直接运行,适合已经上手 DeepSeek 但希望更高效使用的开发者。
本文是"手写 AI 系统"系列文章之一。该系列从零实现 AI 系统中的关键组件,涵盖 RAG、Agent、Function Calling、MCP 等核心技术,帮助你深入理解底层原理,构建属于自己的 AI 工具。
更多推荐


所有评论(0)