AI Agent 多任务处理:并行编排、状态隔离与失败恢复的工程实践
AI Agent 多任务处理:并行编排、状态隔离与失败恢复的工程实践
一、从串行到并行:Agent 系统的任务瓶颈
在 AI Agent 系统中,单任务串行执行是最简单的实现方式,但也是性能最差的。一个典型的数据处理 Agent 流水线包含:数据获取、清洗、分析、报告生成四个步骤。串行执行时,每个步骤必须等前一步完成,总耗时是各步骤之和。
实际生产中的痛点更具体:一个金融风控 Agent 需要同时调用征信查询、行为分析、关联网络扫描三个子任务。串行执行耗时约 12 秒,而用户对风控决策的容忍上限是 3 秒。并行执行三个子任务后,总耗时降至最慢子任务的耗时(约 2.8 秒),满足 SLA 要求。
但并行编排引入了新的工程问题:子任务之间的状态如何隔离?部分失败时如何恢复?结果如何聚合?这些问题的处理质量直接决定 Agent 系统在生产环境的可靠性。
二、并行编排的架构模型:Fan-out/Fan-in 与 DAG 调度
Agent 多任务处理的核心架构是 Fan-out/Fan-in 模式:将一个主任务拆分为多个子任务并行执行(Fan-out),然后收集结果并聚合(Fan-in)。更复杂的场景需要 DAG(有向无环图)调度,处理子任务之间的依赖关系。
graph TB
A[主任务:风控决策] --> B[子任务1:征信查询]
A --> C[子任务2:行为分析]
A --> D[子任务3:关联网络扫描]
B --> E[结果聚合器]
C --> E
D --> E
E --> F{全部成功?}
F -->|是| G[生成风控报告]
F -->|否| H[失败恢复策略]
H --> I[重试 / 降级 / 部分结果]
style A fill:#e1f5fe
style E fill:#fff3e0
style F fill:#ffebee
style G fill:#e8f5e9
关键设计要素:
- 状态隔离:每个子任务拥有独立的上下文(Context),互不污染。子任务的中间状态通过消息传递而非共享内存交互。
- 超时控制:每个子任务设置独立超时,避免单个慢任务拖垮整体。主任务设置全局超时作为兜底。
- 失败策略:区分可重试失败(网络超时、限流)和不可重试失败(参数错误、权限拒绝),分别处理。
三、生产级代码:基于 Python 的并行 Agent 编排框架
以下代码实现了一个支持并行编排、状态隔离、失败恢复的 Agent 任务调度器。
import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Coroutine
import logging
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
TIMEOUT = "timeout"
@dataclass
class TaskResult:
"""子任务执行结果,包含状态、返回值和元信息"""
task_id: str
status: TaskStatus
value: Any = None
error: str | None = None
duration_ms: float = 0
retry_count: int = 0
@dataclass
class TaskSpec:
"""子任务定义,包含执行函数、超时和重试策略"""
task_id: str
func: Callable[..., Coroutine]
args: tuple = ()
kwargs: dict = field(default_factory=dict)
timeout_sec: float = 10.0
max_retries: int = 2
retry_delay_sec: float = 1.0
# 是否为关键任务:关键任务失败则整个编排失败
critical: bool = True
class AgentOrchestrator:
"""Agent 多任务并行编排器,支持状态隔离与失败恢复"""
def __init__(self, global_timeout_sec: float = 30.0):
self.global_timeout_sec = global_timeout_sec
self._results: dict[str, TaskResult] = {}
async def execute_parallel(
self, tasks: list[TaskSpec]
) -> dict[str, TaskResult]:
"""并行执行多个子任务,返回每个任务的结果"""
start = time.monotonic()
# 为每个子任务创建独立的 asyncio.Task,实现状态隔离
coroutines = [self._execute_with_retry(t) for t in tasks]
# 使用 asyncio.gather 收集结果,不因单个失败取消其他任务
results = await asyncio.gather(*coroutines, return_exceptions=True)
for i, result in enumerate(results):
task_id = tasks[i].task_id
if isinstance(result, Exception):
# gather 捕获的异常,转为 TaskResult
self._results[task_id] = TaskResult(
task_id=task_id,
status=TaskStatus.FAILED,
error=str(result),
duration_ms=(time.monotonic() - start) * 1000,
)
else:
self._results[task_id] = result
# 检查关键任务是否全部成功
critical_failed = [
t.task_id for t in tasks
if t.critical
and self._results.get(t.task_id)
and self._results[t.task_id].status != TaskStatus.SUCCESS
]
if critical_failed:
logger.error(
"关键任务失败: %s,编排终止", critical_failed
)
return self._results
async def _execute_with_retry(self, spec: TaskSpec) -> TaskResult:
"""带重试和超时的单任务执行"""
start = time.monotonic()
last_error: str | None = None
for attempt in range(spec.max_retries + 1):
try:
# 每次重试使用独立的超时控制
value = await asyncio.wait_for(
spec.func(*spec.args, **spec.kwargs),
timeout=spec.timeout_sec,
)
return TaskResult(
task_id=spec.task_id,
status=TaskStatus.SUCCESS,
value=value,
duration_ms=(time.monotonic() - start) * 1000,
retry_count=attempt,
)
except asyncio.TimeoutError:
last_error = f"超时({spec.timeout_sec}s)"
logger.warning(
"任务 %s 第 %d 次超时", spec.task_id, attempt + 1
)
except Exception as e:
last_error = str(e)
logger.warning(
"任务 %s 第 %d 次失败: %s",
spec.task_id, attempt + 1, e,
)
# 重试前等待(最后一次不需要)
if attempt < spec.max_retries:
await asyncio.sleep(spec.retry_delay_sec)
return TaskResult(
task_id=spec.task_id,
status=TaskStatus.FAILED,
error=last_error,
duration_ms=(time.monotonic() - start) * 1000,
retry_count=spec.max_retries,
)
def get_summary(self) -> dict[str, Any]:
"""获取执行摘要,用于监控和日志"""
total = len(self._results)
success = sum(
1 for r in self._results.values()
if r.status == TaskStatus.SUCCESS
)
return {
"total": total,
"success": success,
"failed": total - success,
"success_rate": f"{success / total:.1%}" if total else "N/A",
"details": {
tid: {"status": r.status.value, "duration_ms": round(r.duration_ms)}
for tid, r in self._results.items()
},
}
# ========== 使用示例:风控决策 Agent ==========
async def query_credit(user_id: str) -> dict:
"""模拟征信查询"""
await asyncio.sleep(1.5)
return {"user_id": user_id, "score": 720, "level": "A"}
async def analyze_behavior(user_id: str) -> dict:
"""模拟行为分析"""
await asyncio.sleep(2.0)
return {"user_id": user_id, "risk_tags": [], "anomaly_score": 0.12}
async def scan_network(user_id: str) -> dict:
"""模拟关联网络扫描"""
await asyncio.sleep(2.5)
return {"user_id": user_id, "linked_accounts": 3, "fraud_score": 0.05}
async def main():
orchestrator = AgentOrchestrator(global_timeout_sec=15.0)
tasks = [
TaskSpec(
task_id="credit_query",
func=query_credit,
args=("user_001",),
timeout_sec=5.0,
critical=True,
),
TaskSpec(
task_id="behavior_analysis",
func=analyze_behavior,
args=("user_001",),
timeout_sec=5.0,
critical=True,
),
TaskSpec(
task_id="network_scan",
func=scan_network,
args=("user_001",),
timeout_sec=5.0,
# 非关键任务:失败不影响整体决策
critical=False,
),
]
results = await orchestrator.execute_parallel(tasks)
summary = orchestrator.get_summary()
logger.info("执行摘要: %s", summary)
if __name__ == "__main__":
asyncio.run(main())
核心设计点:
- 状态隔离:每个子任务通过独立的
asyncio.wait_for执行,超时互不影响。TaskResult数据结构独立存储每个任务的状态。 - 失败恢复:通过
max_retries和retry_delay_sec控制重试策略。区分关键任务和非关键任务,非关键任务失败不阻断整体流程。 - 结果聚合:
asyncio.gather(return_exceptions=True)确保单个任务异常不会取消其他任务,所有结果统一收集。
四、并行编排的代价与适用边界
4.1 并行度不是越高越好
并行子任务数过多会导致资源竞争。LLM API 调用通常有并发限制(如 OpenAI 的 RPM/TPM 限制),无限制并行会触发限流,反而增加重试开销。建议根据 API 的并发上限设置信号量(asyncio.Semaphore)控制并发度。
4.2 子任务间有依赖时需 DAG 调度
Fan-out/Fan-in 模型假设子任务之间无依赖。如果子任务 B 依赖子任务 A 的输出,需要 DAG 调度器(如 Prefect、Airflow 的思路)。DAG 调度的工程复杂度显著高于简单并行,需要处理拓扑排序、循环依赖检测、中间结果传递等问题。
4.3 部分失败的业务语义
非关键任务失败后,聚合结果中缺少该部分数据。下游消费方必须能处理不完整结果,否则需要降级策略(如用默认值填充)。这个业务语义问题无法在编排层解决,必须在 Agent 的业务逻辑层设计。
4.4 可观测性要求
并行任务的调试难度远高于串行。每个子任务的开始时间、结束时间、重试次数必须记录,否则生产事故排查无从下手。get_summary() 方法提供基础信息,生产环境应接入分布式追踪(如 OpenTelemetry)。
4.5 适用与禁用场景
| 场景 | 是否适用 | 原因 |
|---|---|---|
| 多个独立 API 并行调用 | 适用 | 无依赖,收益明确 |
| LLM 多轮对话 | 不适用 | 前后轮次有依赖 |
| 数据 ETL 流水线 | 看情况 | 有依赖时需 DAG |
| 实时决策(SLA < 1s) | 适用 | 并行缩短耗时 |
| 批量离线处理 | 不适用 | 串行更简单,并行收益低 |
五、总结
Agent 多任务并行编排的核心收益是降低整体耗时,核心代价是工程复杂度和调试难度。Fan-out/Fan-in 模型适合子任务无依赖的场景,有依赖时需引入 DAG 调度。工程实现上,状态隔离、超时控制、失败恢复、关键任务标记是四个必须覆盖的要素。并行度需要根据下游 API 的并发限制做约束,避免触发限流。可观测性不是可选项,而是生产环境的基本要求。
更多推荐
所有评论(0)