进阶 AI Agent 工程架构
一套适合落地的架构:FastAPI + Swagger/OpenAPI + MCP + A2A + Queue + Memory + Observability。
FastAPI 适合做 Agent 服务层,因为它基于 Python 类型提示构建 API,并自动生成 OpenAPI schema、Swagger UI /docs 和 ReDoc /redoc。([FastAPI][1])
A2A 负责 Agent 与 Agent 协作,核心包括 Agent Card、Task、Artifact、Streaming、Push Notifications。([GitHub][2])
1. 总体架构
User / App / Web
↓
API Gateway / Auth
↓
FastAPI Agent Service
↓
Supervisor Agent
├── Planner
├── Router
├── Memory Manager
├── Tool Executor
├── Safety Guard
└── Observability
↓
┌───────────────┬───────────────┬───────────────┐
│ A2A Agent │ MCP Tools │ Internal APIs │
│ Research Agent │ Search Tool │ CRM / ERP │
│ Code Agent │ DB Tool │ GIS / IoT │
│ Report Agent │ File Tool │ Payment │
└───────────────┴───────────────┴───────────────┘
↓
Task Store / Vector DB / Redis / Queue
↓
Artifact Output
一句话:
FastAPI 暴露服务,Swagger 描述接口,A2A 调 Agent,MCP 调工具,Queue 跑长任务,Memory 存上下文,Observability 做生产监控。
2. 分层设计
L1:接口层 FastAPI
负责对外暴露:
POST /agent/chat
POST /agent/task
GET /agent/task/{task_id}
GET /agent/stream/{task_id}
GET /.well-known/agent-card.json
GET /openapi.json
GET /docs
FastAPI 会自动生成 OpenAPI schema,Swagger UI 会读取该 schema 生成可交互 API 文档。([FastAPI][3])
L2:Agent 编排层
核心组件:
Supervisor Agent
├── Intent Parser:理解用户目标
├── Planner:拆解任务
├── Router:选择 Agent 或 Tool
├── Executor:执行任务
├── Critic:检查结果
├── Memory:读写上下文
└── Artifact Builder:生成最终结果
典型流程:
用户请求
↓
意图识别
↓
任务拆解
↓
选择 A2A Agent / MCP Tool / 内部 API
↓
执行
↓
校验
↓
生成 Artifact
↓
返回用户
3. A2A 与 MCP 的工程位置
Supervisor Agent
↓ A2A
Specialized Agent
↓ MCP
Tool / DB / API / File / Browser
区别:
| 能力 | A2A | MCP |
|---|---|---|
| 目标 | Agent 调 Agent | Agent 调工具 |
| 对象 | Remote Agent | Tool / Resource |
| 输出 | Task / Artifact | Tool result |
| 适合 | 多 Agent 协作 | 工具标准化 |
A2A 的 Agent Card 是能力发现入口,通常是一个公开 JSON 元数据文档,用来描述 Agent 身份、能力、认证、接口和技能。([Google Codelabs][4])
4. 推荐目录结构
agent-platform/
├── app/
│ ├── main.py
│ ├── api/
│ │ ├── chat.py
│ │ ├── tasks.py
│ │ ├── stream.py
│ │ └── agent_card.py
│ ├── agents/
│ │ ├── supervisor.py
│ │ ├── planner.py
│ │ ├── router.py
│ │ ├── critic.py
│ │ └── memory_agent.py
│ ├── a2a/
│ │ ├── client.py
│ │ ├── server.py
│ │ └── schemas.py
│ ├── mcp/
│ │ ├── client.py
│ │ └── tools.py
│ ├── memory/
│ │ ├── vector_store.py
│ │ └── session_store.py
│ ├── queue/
│ │ ├── worker.py
│ │ └── tasks.py
│ ├── security/
│ │ ├── auth.py
│ │ ├── policy.py
│ │ └── guardrails.py
│ ├── observability/
│ │ ├── logging.py
│ │ ├── tracing.py
│ │ └── metrics.py
│ └── artifacts/
│ └── builder.py
├── tests/
├── docker-compose.yml
├── pyproject.toml
└── README.md
5. 核心数据模型
from pydantic import BaseModel
from typing import Literal, Any
class AgentMessage(BaseModel):
role: Literal["user", "agent", "tool"]
content: str
metadata: dict[str, Any] = {}
class AgentTask(BaseModel):
task_id: str
user_id: str
status: Literal[
"submitted",
"working",
"input_required",
"completed",
"failed"
]
input: AgentMessage
output: dict[str, Any] | None = None
class Artifact(BaseModel):
artifact_id: str
name: str
type: Literal["markdown", "json", "image", "html", "code"]
content: str
6. FastAPI 服务骨架
from fastapi import FastAPI
from pydantic import BaseModel
from uuid import uuid4
app = FastAPI(
title="AI Agent Platform",
description="FastAPI + Swagger + A2A + MCP Agent 工程平台",
version="1.0.0",
)
class ChatRequest(BaseModel):
user_id: str
message: str
@app.get("/.well-known/agent-card.json")
def agent_card():
return {
"name": "City Emergency Supervisor Agent",
"description": "智慧城市应急响应主控 Agent",
"skills": [
{
"id": "emergency_orchestration",
"name": "Emergency Orchestration",
"description": "协调消防、交通、医疗、气象 Agent"
}
],
"capabilities": {
"streaming": True,
"pushNotifications": True
}
}
@app.post("/agent/task")
def create_task(req: ChatRequest):
task_id = str(uuid4())
return {
"task_id": task_id,
"status": "submitted",
"message": "任务已创建"
}
@app.get("/agent/task/{task_id}")
def get_task(task_id: str):
return {
"task_id": task_id,
"status": "working"
}
启动后:
uvicorn app.main:app --reload
访问:
http://127.0.0.1:8000/docs
7. A2A 调度逻辑
class A2ARouter:
def __init__(self, agents: list[dict]):
self.agents = agents
def select_agent(self, task: str) -> dict:
for agent in self.agents:
skills = " ".join(skill["id"] for skill in agent["skills"])
if "traffic" in task and "traffic_control" in skills:
return agent
if "fire" in task and "fire_response" in skills:
return agent
return self.agents[0]
class SupervisorAgent:
def __init__(self, router: A2ARouter):
self.router = router
def plan(self, user_input: str):
return [
{"type": "a2a", "task": "fire_detection"},
{"type": "a2a", "task": "traffic_control"},
{"type": "mcp", "task": "query_gis"},
{"type": "artifact", "task": "build_report"},
]
def run(self, user_input: str):
plan = self.plan(user_input)
results = []
for step in plan:
if step["type"] == "a2a":
agent = self.router.select_agent(step["task"])
results.append({
"agent": agent["name"],
"result": f"{step['task']} completed"
})
elif step["type"] == "mcp":
results.append({
"tool": "GIS Tool",
"result": "地图数据已返回"
})
return results
8. 长任务架构
对于 AI Agent,很多任务不是同步返回的,比如:
深度研究
代码生成
城市应急调度
文件分析
多 Agent 协作
报表生成
推荐使用:
FastAPI
↓
Redis / RabbitMQ / Kafka
↓
Worker
↓
Task Store
↓
SSE / WebSocket / Push Notification
流程:
POST /agent/task
↓
返回 task_id
↓
后台 Worker 执行
↓
GET /agent/task/{task_id}
↓
SSE 推送中间状态
↓
返回 Artifact
9. 流式输出 SSE 示例
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def event_stream(task_id: str):
states = [
{"status": "submitted"},
{"status": "working", "message": "正在分析任务"},
{"status": "working", "message": "正在调用交通 Agent"},
{"status": "working", "message": "正在生成 Artifact"},
{"status": "completed", "artifact": "report.md"},
]
for state in states:
yield f"data: {json.dumps(state, ensure_ascii=False)}\n\n"
await asyncio.sleep(1)
@app.get("/agent/stream/{task_id}")
def stream_task(task_id: str):
return StreamingResponse(
event_stream(task_id),
media_type="text/event-stream"
)
10. 智慧城市应急响应联盟:进阶架构
Citizen / IoT / CCTV / 12345 Hotline
↓
Event Ingestion Gateway
↓
City Emergency Supervisor Agent
↓
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ Fire Agent │ Traffic Agent│ Medical Agent│ Weather Agent│
└──────┬───────┴──────┬───────┴──────┬───────┴──────┬───────┘
↓ ↓ ↓ ↓
Fire IoT GIS / CCTV Hospital API Weather API
↓ ↓ ↓ ↓
└──────────────┬──────────────┬──────────────┘
↓
Decision Optimizer
↓
Emergency Artifact
↓
Dispatch / Alert / Dashboard / Report
这里:
A2A:Supervisor 调 Fire / Traffic / Medical / Weather Agent
MCP:各 Agent 调 IoT、GIS、医院、气象、数据库
FastAPI:每个 Agent 的服务接口
Swagger:每个 Agent 的 API 文档
Queue:异步任务与事件流
Vector DB:历史事件与预案记忆
Redis:短期状态与任务缓存
PostgreSQL:任务、审计、Artifact 元数据
11. 生产级必备能力
| 模块 | 必须做什么 |
|---|---|
| Auth | API Key、OAuth2、JWT、mTLS |
| Permission | 不同 Agent 只能访问授权工具 |
| Rate Limit | 限制 Agent 调用频率 |
| Cost Control | 限制 token、工具调用次数、外部 API 成本 |
| Audit Log | 谁调用了谁、输入是什么、结果是什么 |
| Trace | 一次用户请求贯穿多个 Agent 的 trace_id |
| Replay | 失败任务可重放 |
| Human-in-the-loop | 高风险动作必须人工确认 |
| Guardrails | 防注入、防越权、防幻觉输出 |
| Sandbox | 代码执行、文件处理必须隔离 |
| Artifact Store | 文件、报告、图片、JSON 统一存储 |
12. 最推荐的技术栈
| 层 | 推荐 |
|---|---|
| API 服务 | FastAPI |
| API 文档 | Swagger UI / OpenAPI |
| Agent 协议 | A2A |
| 工具协议 | MCP |
| 编排 | LangGraph / 自研状态机 |
| 异步任务 | Celery / Dramatiq / RQ |
| 消息队列 | Redis / RabbitMQ / Kafka |
| 结构化存储 | PostgreSQL |
| 向量数据库 | pgvector / Qdrant / Milvus |
| 缓存 | Redis |
| 文件存储 | S3 / MinIO |
| 监控 | OpenTelemetry + Prometheus + Grafana |
| 日志 | Loki / ELK |
| 部署 | Docker / Kubernetes |
13. 路线落地
第一阶段:单 Agent 服务化
FastAPI + Swagger + 单个 Supervisor Agent
目标:能通过 HTTP 调 Agent。
第二阶段:工具调用标准化
Supervisor Agent + MCP Tools
目标:Agent 可以调用搜索、数据库、文件、GIS 等工具。
第三阶段:多 Agent 协作
Supervisor Agent + A2A Remote Agents
目标:主控 Agent 可以调专业 Agent。
第四阶段:异步与流式任务
Task Store + Queue + SSE
目标:支持长任务、进度条、中间结果。
第五阶段:生产级治理
Auth + Audit + Trace + Guardrails + Cost Control
目标:能上线企业系统。
14. 总结
进阶 AI Agent 工程架构不是一个 Prompt,也不是一个聊天机器人,而是一套完整后端系统:
FastAPI = Agent 服务入口
Swagger/OpenAPI = API 说明书
A2A = Agent 协作协议
MCP = 工具调用协议
Queue = 长任务执行
Memory = 上下文与知识
Artifact = 可交付结果
Observability = 生产监控
Guardrails = 安全边界
最小可上线版本建议:
FastAPI
+ Swagger/OpenAPI
+ Supervisor Agent
+ MCP Tool Layer
+ A2A Agent Card
+ Redis Queue
+ PostgreSQL Task Store
+ SSE Streaming
+ OpenTelemetry Trace
参考链接:
[1]: https://fastapi.tiangolo.com/?utm_source=chatgpt.com “FastAPI - FastAPI”
[2]: https://github.com/a2aproject/A2A/blob/main/docs/specification.md?utm_source=chatgpt.com “A2A/docs/specification.md at main · a2aproject/A2A”
[3]: https://fastapi.tiangolo.com/reference/openapi/docs/?utm_source=chatgpt.com “OpenAPI docs - FastAPI”
[4]: https://codelabs.developers.google.com/intro-a2a-purchasing-concierge?utm_source=chatgpt.com “Getting Started with Agent2Agent (A2A) Protocol - Codelabs”
更多推荐

所有评论(0)