Event-driven-Agent架构
核对日期:2026-05-09。
1. 定义与边界
Event-driven Agent 架构用事件触发 Agent 运行。事件可以来自消息队列、数据库变更、告警、Webhook、文件上传、定时任务或其他 Agent 的输出。Agent 不再只由用户对话驱动,而是成为异步业务系统中的消费者、处理器或编排节点。
它不是简单的“后台任务”。核心差异在于:事件携带业务状态变化,Agent 的输入输出要满足消息语义、幂等、重试、顺序和审计要求。
2. 为什么重要
很多生产 Agent 需要在用户不在线时工作:监控告警、销售线索跟进、工单自动分派、合规扫描、代码仓库事件处理、知识库增量更新。事件驱动架构能解耦触发源、执行器和下游动作。
适用:
- 异步任务、批处理、告警响应、运营自动化。
- 多系统集成,事件源和处理逻辑需要解耦。
- 需要削峰、重试、死信队列和审计。
不适用:
- 强交互式、多轮澄清任务。
- 低延迟同步请求且无异步容忍。
3. 核心机制
事件 envelope:
{
"event_id": "evt_001",
"event_type": "ticket.created",
"occurred_at": "2026-05-09T10:00:00Z",
"subject": "ticket_123",
"actor": "user_456",
"payload_ref": "s3://bucket/payload.json",
"trace_id": "trace_abc",
"idempotency_key": "ticket_123:v1"
}
4. 架构模式
| 模式 | 描述 | 适用 |
|---|---|---|
| Agent consumer | Agent 订阅某类事件并处理 | 工单分类、告警摘要 |
| Agent enrichment | Agent 给事件补充结构化字段 | 线索评分、风险标签 |
| Agent saga participant | Agent 作为长流程中的一步 | 理赔、采购、审批 |
| Agent event emitter | Agent 结果触发后续事件 | 自动创建任务、通知 |
| Multi-agent event mesh | Agent 之间通过事件协作 | 大规模异步研究/运营 |
5. 工程实现
def handle_event(event):
if already_processed(event.idempotency_key):
return
with trace(event.trace_id):
payload = load_payload(event.payload_ref)
decision = policy_filter(event, payload)
if decision.blocked:
emit("agent.blocked", reason=decision.reason)
return
result = agent.run(payload, context={"event": event})
validate_result(result)
persist_result(event.event_id, result)
emit("agent.completed", result_ref=result.ref)
工程要点:
- 所有事件有全局唯一
event_id和业务幂等键。 - 大 payload 用引用传递,避免消息队列塞入敏感长文本。
- Agent worker 无状态,状态存 checkpoint/database。
- 工具调用要支持重试、超时和幂等。
- 失败事件进入 dead letter queue,并附带 trace。
6. 生产实践
- 明确 at-least-once 还是 exactly-once 语义;多数队列是 at-least-once,需要幂等处理。
- 处理乱序:使用版本号、时间戳和状态机校验。
- 对同一 subject 加锁或串行化,避免并发 Agent 写冲突。
- 使用 backpressure:队列长度、并发数、模型速率限制联动。
- 对失败分类:可重试、不可重试、需人工、需补偿。
7. 常见反模式
- 无幂等键,重试导致重复发信、重复退款。
- Agent 从队列消息里直接读取未脱敏敏感数据。
- 事件 schema 无版本,发布方改字段导致消费者误判。
- 把 Agent memory 当事件存储,无法回放和审计。
- 没有死信队列,失败消息无限重试。
8. 评测方法
- Event Handling Accuracy:事件是否被正确分类和处理。
- Idempotency Test:重复投递是否只产生一次副作用。
- Replay Test:历史事件重放是否得到可解释结果。
- Latency SLO:从事件产生到处理完成的耗时。
- DLQ Rate:死信比例和主要失败原因。
9. 安全与治理
- 事件源鉴权,防止伪造事件触发高风险 Agent。
- payload 引用使用短期授权,避免长期暴露。
- 对下游动作使用最小权限和审批 gate。
- 日志中不记录完整敏感 payload。
- 对事件 schema 和 route 配置做版本管理。
10. 工程手册补充
10.1 控制流、状态流、工具流
Event-driven Agent 的核心是事件契约,不是让 Agent 随机监听消息。
| 流 | 设计要求 | 观测点 |
|---|---|---|
| 控制流 | 事件触发 handler,handler 不直接阻塞上游请求 | event_id、handler_name、attempt |
| 状态流 | 以事件 id 和业务聚合 id 做幂等去重 | aggregate_id、checkpoint_version |
| 工具流 | 工具调用必须可重试或可查证;写操作带幂等键 | idempotency_key、side_effect_status |
| 恢复流 | retry、DLQ、人工补偿是架构的一部分 | retry_count、dlq_reason |
10.2 适用与不适用边界
适用:
- 订单、工单、告警、文档变更等天然由事件驱动的系统。
- 需要异步处理、削峰填谷、长耗时后台任务。
- 可接受最终一致性,并能靠状态机追踪进度。
不适用:
- 用户要求同步、低延迟、一步完成的问答。
- 外部工具不可幂等,失败后无法确认状态。
- 事件 schema 频繁变化且没有版本治理。
失败恢复与上线清单:
- 每个事件 schema 必须有
event_type、event_version、event_id、occurred_at。 - Handler 先做去重,再调用模型或工具,避免重复消费造成重复写。
- DLQ 要有重放工具,但重放前必须展示原始事件、当前状态和预期副作用。
- 监控事件积压、处理延迟、重试率、DLQ 比例、重复消费命中率、handler 成本。
- 安全上要限制事件来源,外部事件只作为事实输入,不得直接携带系统指令。
11. 权威资料
- AWS Step Functions documentation: https://aws.amazon.com/documentation-overview/step-functions/
- AWS Step Functions service integration patterns: https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
- Temporal durable execution: https://temporal.io/
- LangGraph durable execution: https://docs.langchain.com/oss/python/langgraph/durable-execution
- LangGraph persistence: https://docs.langchain.com/oss/python/langgraph/persistence
- MCP Authorization specification: https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization