跳到主要内容

队列与异步任务

核对日期:2026-05-09。

1. 定义与边界

队列与异步任务用于把耗时、不稳定、可重试的 Agent 工作从同步请求链路中拆出来。典型场景包括长文档处理、多步骤工具执行、批量评测、后台回调、人工审批后的继续执行。

它不是为了“所有事情都异步化”。低延迟、强交互、必须即时反馈的步骤仍应保持同步或流式。

2. 为什么重要

模型和工具调用存在高延迟、限流和失败。把长任务放在 API 请求中会造成连接超时、资源占用和用户体验不稳定。队列提供削峰、隔离、重试、延迟执行和 worker 水平扩展能力。

3. 核心机制

任务消息建议结构:

{
"task_id": "task_01",
"run_id": "run_01",
"session_id": "sess_01",
"task_type": "agent.resume",
"idempotency_key": "run_01:step_4",
"payload_ref": "object://payload/task_01.json",
"attempt": 1,
"priority": "normal",
"deadline_at": "2026-05-09T10:05:00Z"
}

执行流程:

4. 架构模式

模式适用场景注意点
普通任务队列后台处理、批量执行至少一次投递,需要幂等。
FIFO/有序队列同一会话或 run 必须顺序执行吞吐受 message group 限制。
延迟队列稍后重试、等待外部系统需要截止时间和取消机制。
Workflow 引擎长周期、多步骤、人类审批学习和运维成本更高。

5. 工程实现

worker 处理框架:

def handle_task(msg):
if dedupe.seen(msg.idempotency_key):
return ack()
with run_lock(msg.run_id, ttl=60):
state = state_store.get(msg.run_id)
if not can_execute(state, msg):
return ack()
result = execute_step(state)
state_store.apply(result.event)
dedupe.mark_done(msg.idempotency_key)
return ack()

队列参数要显式化:

  • 可见性超时 / ack deadline。
  • 最大重试次数。
  • 死信队列。
  • 优先级或独立队列。
  • worker 并发、预取数量、超时。

6. 生产实践

  • 同步 API 返回 run_id 和当前状态查询 URL,而不是等待长任务完成。
  • 长任务和短任务分队列,避免短任务被大任务阻塞。
  • 每个任务有 deadline,过期任务不继续执行副作用。
  • 对外部 webhook 使用签名校验和幂等处理。
  • 死信队列要有告警和人工处理 runbook。

7. 常见反模式

  • 任务没有幂等键,重试后重复创建工单、付款或发邮件。
  • worker 无超时,网络阻塞导致并发耗尽。
  • 所有任务共用一个队列,低优先级批处理压垮实时任务。
  • ack 太早,任务执行失败后消息丢失。
  • ack 太晚但任务不幂等,worker 崩溃后重复副作用。

8. 评测方法

  • 可靠性:模拟 worker crash、队列重复投递、网络超时。
  • 吞吐:压测 P95 排队时间、执行时间和 backlog。
  • 恢复:死信任务能否按 runbook 重放或补偿。
  • 成本:队列重试是否放大模型调用成本。

9. 安全与治理

  • 队列消息不直接放敏感正文,使用受控对象存储引用。
  • worker 权限按队列和工具域拆分,不给后台进程全量权限。
  • 对高风险任务设置人工审批状态,不允许模型绕过。
  • 任务日志隐藏敏感参数,Celery 文档也提醒日志脱敏并不等于消息本身加密。

10. 权威资料

11. 二次精修:任务 schema 与队列拓扑

Agent 队列任务必须显式表达幂等键、预算、超时、重试和审批上下文,不能只塞一段自然语言。

{
"task_id": "task_01J...",
"task_type": "agent.run",
"idempotency_key": "tenant:user:business_request_hash",
"tenant_id": "t_123",
"priority": "normal",
"payload": {"agent_id": "refund_agent", "session_id": "ses_456", "input_ref": "blob://requests/abc"},
"budget": {"max_attempts": 3, "max_runtime_seconds": 300, "max_cost_usd": 0.5},
"retry_policy": {"strategy": "exponential_backoff", "base_seconds": 5, "max_seconds": 120, "jitter": true},
"trace_context": {"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", "parent_span_id": "00f067aa0ba902b7"}
}
队列适用任务关键控制
agent-run普通 Agent 执行并发、预算、超时
tool-io慢工具或外部 API熔断、限流、重试
approval等待人工审批SLA、提醒、过期
replay回放与调试只读工具替身
eval离线评测固定模型和数据集版本
dlq多次失败任务人工分诊和根因标签

12. 异步执行流程

13. 运维与灾备

  • 队列必须有可观测指标:lag、oldest message age、in-flight、retry count、DLQ count、consumer error rate。
  • 长任务不要依赖单个进程内存,worker 重启后应通过状态存储恢复。
  • 外部工具超时要短于任务总超时,避免整个 worker 池被慢调用占满。
  • 灾备演练要验证:队列积压后扩容、DLQ 重放、重复消息去重、跨区域恢复。
  • 安全上要避免把完整用户输入放入消息体,优先放 input_ref,消息队列仅保存引用和必要元数据。

14. 补充权威资料

15. 主控验收清单

  • 任务 schema 是否包含 idempotency_key、budget、retry_policy 和 trace_context。
  • 是否区分普通执行、慢工具、审批、回放、评测和 DLQ 队列。
  • 是否监控 queue lag、oldest message age、in-flight 和 DLQ。
  • worker 重启后是否能从状态存储恢复。
  • 是否验证重复消息不会重复副作用。
  • 是否有 DLQ 重放流程和审批。
  • 是否对高优先级任务设置公平调度,避免饿死普通任务。
  • 是否限制消息体中的敏感数据。
  • 是否对外部 API 限流和退避。
  • 是否做过队列积压和恢复演练。