队列与异步任务
核对日期:2026-05-09。
1. 定义与边界
队列与异步任务用于把耗时、不稳定、可重试的 Agent 工作从同步请求链路中拆出来。典型场景包括长文档处理、多步骤工具执行、批量评测、后台回调、人工审批后的继续执行。
它不是为了“所有事情都异步化”。低延迟、强交互、必须即时反馈的步骤仍应保持同步或流式。
2. 为什么重要
模型和工具调用存在高延迟、限流和失败。把长任务放在 API 请求中会造成连接超时、资源占用和用户体验不稳定。队列提供削峰、隔离、重试、延迟执行和 worker 水平扩展能力。
3. 核心机制
任务消息建议结构:
{
"task_id": "task_01",
"run_id": "run_01",
"session_id": "sess_01",
"task_type": "agent.resume",
"idempotency_key": "run_01:step_4",
"payload_ref": "object://payload/task_01.json",
"attempt": 1,
"priority": "normal",
"deadline_at": "2026-05-09T10:05:00Z"
}
执行流程:
4. 架构模式
| 模式 | 适用场景 | 注意点 |
|---|---|---|
| 普通任务队列 | 后台处理、批量执行 | 至少一次投递,需要幂等。 |
| FIFO/有序队列 | 同一会话或 run 必须顺序执行 | 吞吐受 message group 限制。 |
| 延迟队列 | 稍后重试、等待外部系统 | 需要截止时间和取消机制。 |
| Workflow 引擎 | 长周期、多步骤、人类审批 | 学习和运维成本更高。 |
5. 工程实现
worker 处理框架:
def handle_task(msg):
if dedupe.seen(msg.idempotency_key):
return ack()
with run_lock(msg.run_id, ttl=60):
state = state_store.get(msg.run_id)
if not can_execute(state, msg):
return ack()
result = execute_step(state)
state_store.apply(result.event)
dedupe.mark_done(msg.idempotency_key)
return ack()
队列参数要显式化:
- 可见性超时 / ack deadline。
- 最大重试次数。
- 死信队列。
- 优先级或独立队列。
- worker 并发、预取数量、超时。
6. 生产实践
- 同步 API 返回
run_id和当前状态查询 URL,而不是等待长任务完成。 - 长任务和短任务分队列,避免短任务被大任务阻塞。
- 每个任务有 deadline,过期任务不继续执行副作用。
- 对外部 webhook 使用签名校验和幂等处理。
- 死信队列要有告警和人工处理 runbook。
7. 常见反模式
- 任务没有幂等键,重试后重复创建工单、付款或发邮件。
- worker 无超时,网络阻塞导致并发耗尽。
- 所有任务共用一个队列,低优先级批处理压垮实时任务。
- ack 太早,任务执行失败后消息丢失。
- ack 太晚但任务不幂等,worker 崩溃后重复副作用。
8. 评测方法
- 可靠性:模拟 worker crash、队列重复投递、网络超时。
- 吞吐:压测 P95 排队时间、执行时间和 backlog。
- 恢复:死信任务能否按 runbook 重放或补偿。
- 成本:队列重试是否放大模型调用成本。
9. 安全与治理
- 队列消息不直接放敏感正文,使用受控对象存储引用。
- worker 权限按队列和工具域拆分,不给后台进程全量权限。
- 对高风险任务设置人工审批状态,不允许模型绕过。
- 任务日志隐藏敏感参数,Celery 文档也提醒日志脱敏并不等于消息本身加密。
10. 权威资料
- Celery tasks and retries: https://docs.celeryq.dev/en/stable/userguide/tasks.html
- Amazon SQS exactly-once processing for FIFO queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-exactly-once-processing.html
- Temporal documentation: https://docs.temporal.io/
- OpenAI Background mode: https://developers.openai.com/api/docs/guides/background
- OpenAI Webhooks: https://developers.openai.com/api/docs/guides/webhooks
11. 二次精修:任务 schema 与队列拓扑
Agent 队列任务必须显式表达幂等键、预算、超时、重试和审批上下文,不能只塞一段自然语言。
{
"task_id": "task_01J...",
"task_type": "agent.run",
"idempotency_key": "tenant:user:business_request_hash",
"tenant_id": "t_123",
"priority": "normal",
"payload": {"agent_id": "refund_agent", "session_id": "ses_456", "input_ref": "blob://requests/abc"},
"budget": {"max_attempts": 3, "max_runtime_seconds": 300, "max_cost_usd": 0.5},
"retry_policy": {"strategy": "exponential_backoff", "base_seconds": 5, "max_seconds": 120, "jitter": true},
"trace_context": {"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", "parent_span_id": "00f067aa0ba902b7"}
}
| 队列 | 适用任务 | 关键控制 |
|---|---|---|
agent-run | 普通 Agent 执行 | 并发、预算、超时 |
tool-io | 慢工具或外部 API | 熔断、限流、重试 |
approval | 等待人工审批 | SLA、提醒、过期 |
replay | 回放与调试 | 只读工具替身 |
eval | 离线评测 | 固定模型和数据集版本 |
dlq | 多次失败任务 | 人工分诊和根因标签 |
12. 异步执行流程
13. 运维与灾备
- 队列必须有可观测指标:lag、oldest message age、in-flight、retry count、DLQ count、consumer error rate。
- 长任务不要依赖单个进程内存,worker 重启后应通过状态存储恢复。
- 外部工具超时要短于任务总超时,避免整个 worker 池被慢调用占满。
- 灾备演练要验证:队列积压后扩容、DLQ 重放、重复消息去重、跨区域恢复。
- 安全上要避免把完整用户输入放入消息体,优先放
input_ref,消息队列仅保存引用和必要元数据。
14. 补充权威资料
- Temporal docs: https://docs.temporal.io/ (核对日期:2026-05-09)
- AWS SQS best practices: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-best-practices.html (核对日期:2026-05-09)
- Celery retrying: https://docs.celeryq.dev/en/stable/userguide/tasks.html (核对日期:2026-05-09)
15. 主控验收清单
- 任务 schema 是否包含 idempotency_key、budget、retry_policy 和 trace_context。
- 是否区分普通执行、慢工具、审批、回放、评测和 DLQ 队列。
- 是否监控 queue lag、oldest message age、in-flight 和 DLQ。
- worker 重启后是否能从状态存储恢复。
- 是否验证重复消息不会重复副作用。
- 是否有 DLQ 重放流程和审批。
- 是否对高优先级任务设置公平调度,避免饿死普通任务。
- 是否限制消息体中的敏感数据。
- 是否对外部 API 限流和退避。
- 是否做过队列积压和恢复演练。