AI Agent 多任务处理:并行编排、状态隔离与失败恢复的工程实践

一、从串行到并行:Agent 系统的任务瓶颈

在 AI Agent 系统中,单任务串行执行是最简单的实现方式,但也是性能最差的。一个典型的数据处理 Agent 流水线包含:数据获取、清洗、分析、报告生成四个步骤。串行执行时,每个步骤必须等前一步完成,总耗时是各步骤之和。

实际生产中的痛点更具体:一个金融风控 Agent 需要同时调用征信查询、行为分析、关联网络扫描三个子任务。串行执行耗时约 12 秒,而用户对风控决策的容忍上限是 3 秒。并行执行三个子任务后,总耗时降至最慢子任务的耗时(约 2.8 秒),满足 SLA 要求。

但并行编排引入了新的工程问题:子任务之间的状态如何隔离?部分失败时如何恢复?结果如何聚合?这些问题的处理质量直接决定 Agent 系统在生产环境的可靠性。

二、并行编排的架构模型:Fan-out/Fan-in 与 DAG 调度

Agent 多任务处理的核心架构是 Fan-out/Fan-in 模式:将一个主任务拆分为多个子任务并行执行(Fan-out),然后收集结果并聚合(Fan-in)。更复杂的场景需要 DAG(有向无环图)调度,处理子任务之间的依赖关系。

graph TB
    A[主任务:风控决策] --> B[子任务1:征信查询]
    A --> C[子任务2:行为分析]
    A --> D[子任务3:关联网络扫描]
    B --> E[结果聚合器]
    C --> E
    D --> E
    E --> F{全部成功?}
    F -->|是| G[生成风控报告]
    F -->|否| H[失败恢复策略]
    H --> I[重试 / 降级 / 部分结果]

    style A fill:#e1f5fe
    style E fill:#fff3e0
    style F fill:#ffebee
    style G fill:#e8f5e9

关键设计要素:

  • 状态隔离:每个子任务拥有独立的上下文(Context),互不污染。子任务的中间状态通过消息传递而非共享内存交互。
  • 超时控制:每个子任务设置独立超时,避免单个慢任务拖垮整体。主任务设置全局超时作为兜底。
  • 失败策略:区分可重试失败(网络超时、限流)和不可重试失败(参数错误、权限拒绝),分别处理。

三、生产级代码:基于 Python 的并行 Agent 编排框架

以下代码实现了一个支持并行编排、状态隔离、失败恢复的 Agent 任务调度器。

import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Coroutine
import logging

logger = logging.getLogger(__name__)


class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    TIMEOUT = "timeout"


@dataclass
class TaskResult:
    """子任务执行结果,包含状态、返回值和元信息"""
    task_id: str
    status: TaskStatus
    value: Any = None
    error: str | None = None
    duration_ms: float = 0
    retry_count: int = 0


@dataclass
class TaskSpec:
    """子任务定义,包含执行函数、超时和重试策略"""
    task_id: str
    func: Callable[..., Coroutine]
    args: tuple = ()
    kwargs: dict = field(default_factory=dict)
    timeout_sec: float = 10.0
    max_retries: int = 2
    retry_delay_sec: float = 1.0
    # 是否为关键任务:关键任务失败则整个编排失败
    critical: bool = True


class AgentOrchestrator:
    """Agent 多任务并行编排器,支持状态隔离与失败恢复"""

    def __init__(self, global_timeout_sec: float = 30.0):
        self.global_timeout_sec = global_timeout_sec
        self._results: dict[str, TaskResult] = {}

    async def execute_parallel(
        self, tasks: list[TaskSpec]
    ) -> dict[str, TaskResult]:
        """并行执行多个子任务,返回每个任务的结果"""
        start = time.monotonic()
        # 为每个子任务创建独立的 asyncio.Task,实现状态隔离
        coroutines = [self._execute_with_retry(t) for t in tasks]
        # 使用 asyncio.gather 收集结果,不因单个失败取消其他任务
        results = await asyncio.gather(*coroutines, return_exceptions=True)

        for i, result in enumerate(results):
            task_id = tasks[i].task_id
            if isinstance(result, Exception):
                # gather 捕获的异常,转为 TaskResult
                self._results[task_id] = TaskResult(
                    task_id=task_id,
                    status=TaskStatus.FAILED,
                    error=str(result),
                    duration_ms=(time.monotonic() - start) * 1000,
                )
            else:
                self._results[task_id] = result

        # 检查关键任务是否全部成功
        critical_failed = [
            t.task_id for t in tasks
            if t.critical
            and self._results.get(t.task_id)
            and self._results[t.task_id].status != TaskStatus.SUCCESS
        ]
        if critical_failed:
            logger.error(
                "关键任务失败: %s,编排终止", critical_failed
            )

        return self._results

    async def _execute_with_retry(self, spec: TaskSpec) -> TaskResult:
        """带重试和超时的单任务执行"""
        start = time.monotonic()
        last_error: str | None = None

        for attempt in range(spec.max_retries + 1):
            try:
                # 每次重试使用独立的超时控制
                value = await asyncio.wait_for(
                    spec.func(*spec.args, **spec.kwargs),
                    timeout=spec.timeout_sec,
                )
                return TaskResult(
                    task_id=spec.task_id,
                    status=TaskStatus.SUCCESS,
                    value=value,
                    duration_ms=(time.monotonic() - start) * 1000,
                    retry_count=attempt,
                )
            except asyncio.TimeoutError:
                last_error = f"超时({spec.timeout_sec}s)"
                logger.warning(
                    "任务 %s 第 %d 次超时", spec.task_id, attempt + 1
                )
            except Exception as e:
                last_error = str(e)
                logger.warning(
                    "任务 %s 第 %d 次失败: %s",
                    spec.task_id, attempt + 1, e,
                )

            # 重试前等待(最后一次不需要)
            if attempt < spec.max_retries:
                await asyncio.sleep(spec.retry_delay_sec)

        return TaskResult(
            task_id=spec.task_id,
            status=TaskStatus.FAILED,
            error=last_error,
            duration_ms=(time.monotonic() - start) * 1000,
            retry_count=spec.max_retries,
        )

    def get_summary(self) -> dict[str, Any]:
        """获取执行摘要,用于监控和日志"""
        total = len(self._results)
        success = sum(
            1 for r in self._results.values()
            if r.status == TaskStatus.SUCCESS
        )
        return {
            "total": total,
            "success": success,
            "failed": total - success,
            "success_rate": f"{success / total:.1%}" if total else "N/A",
            "details": {
                tid: {"status": r.status.value, "duration_ms": round(r.duration_ms)}
                for tid, r in self._results.items()
            },
        }


# ========== 使用示例:风控决策 Agent ==========

async def query_credit(user_id: str) -> dict:
    """模拟征信查询"""
    await asyncio.sleep(1.5)
    return {"user_id": user_id, "score": 720, "level": "A"}


async def analyze_behavior(user_id: str) -> dict:
    """模拟行为分析"""
    await asyncio.sleep(2.0)
    return {"user_id": user_id, "risk_tags": [], "anomaly_score": 0.12}


async def scan_network(user_id: str) -> dict:
    """模拟关联网络扫描"""
    await asyncio.sleep(2.5)
    return {"user_id": user_id, "linked_accounts": 3, "fraud_score": 0.05}


async def main():
    orchestrator = AgentOrchestrator(global_timeout_sec=15.0)
    tasks = [
        TaskSpec(
            task_id="credit_query",
            func=query_credit,
            args=("user_001",),
            timeout_sec=5.0,
            critical=True,
        ),
        TaskSpec(
            task_id="behavior_analysis",
            func=analyze_behavior,
            args=("user_001",),
            timeout_sec=5.0,
            critical=True,
        ),
        TaskSpec(
            task_id="network_scan",
            func=scan_network,
            args=("user_001",),
            timeout_sec=5.0,
            # 非关键任务:失败不影响整体决策
            critical=False,
        ),
    ]
    results = await orchestrator.execute_parallel(tasks)
    summary = orchestrator.get_summary()
    logger.info("执行摘要: %s", summary)


if __name__ == "__main__":
    asyncio.run(main())

核心设计点:

  • 状态隔离:每个子任务通过独立的 asyncio.wait_for 执行,超时互不影响。TaskResult 数据结构独立存储每个任务的状态。
  • 失败恢复:通过 max_retriesretry_delay_sec 控制重试策略。区分关键任务和非关键任务,非关键任务失败不阻断整体流程。
  • 结果聚合asyncio.gather(return_exceptions=True) 确保单个任务异常不会取消其他任务,所有结果统一收集。

四、并行编排的代价与适用边界

4.1 并行度不是越高越好

并行子任务数过多会导致资源竞争。LLM API 调用通常有并发限制(如 OpenAI 的 RPM/TPM 限制),无限制并行会触发限流,反而增加重试开销。建议根据 API 的并发上限设置信号量(asyncio.Semaphore)控制并发度。

4.2 子任务间有依赖时需 DAG 调度

Fan-out/Fan-in 模型假设子任务之间无依赖。如果子任务 B 依赖子任务 A 的输出,需要 DAG 调度器(如 Prefect、Airflow 的思路)。DAG 调度的工程复杂度显著高于简单并行,需要处理拓扑排序、循环依赖检测、中间结果传递等问题。

4.3 部分失败的业务语义

非关键任务失败后,聚合结果中缺少该部分数据。下游消费方必须能处理不完整结果,否则需要降级策略(如用默认值填充)。这个业务语义问题无法在编排层解决,必须在 Agent 的业务逻辑层设计。

4.4 可观测性要求

并行任务的调试难度远高于串行。每个子任务的开始时间、结束时间、重试次数必须记录,否则生产事故排查无从下手。get_summary() 方法提供基础信息,生产环境应接入分布式追踪(如 OpenTelemetry)。

4.5 适用与禁用场景

场景 是否适用 原因
多个独立 API 并行调用 适用 无依赖,收益明确
LLM 多轮对话 不适用 前后轮次有依赖
数据 ETL 流水线 看情况 有依赖时需 DAG
实时决策(SLA < 1s) 适用 并行缩短耗时
批量离线处理 不适用 串行更简单,并行收益低

五、总结

Agent 多任务并行编排的核心收益是降低整体耗时,核心代价是工程复杂度和调试难度。Fan-out/Fan-in 模型适合子任务无依赖的场景,有依赖时需引入 DAG 调度。工程实现上,状态隔离、超时控制、失败恢复、关键任务标记是四个必须覆盖的要素。并行度需要根据下游 API 的并发限制做约束,避免触发限流。可观测性不是可选项,而是生产环境的基本要求。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐