A2A中的流式与异步操作

Agent2Agent (A2A)协议专为处理可能不会立即完成的任务而设计。许多AI驱动的操作可能是长时间运行的、涉及多个步骤的、产生增量结果的或需要人工干预的。A2A提供了强大的机制来管理此类异步交互,确保客户端无论保持持续连接还是以更断开的方式操作,都能有效地接收更新。

1. 使用服务器发送事件(SSE)进行流式传输

对于产生增量结果(如生成长文档或流媒体)或提供持续状态更新的任务,A2A支持使用服务器发送事件(SSE)进行实时通信。这适用于客户端可以与A2A服务器保持活动HTTP连接的情况。

关键特性:

  • 初始化: 客户端使用tasks/sendSubscribe RPC方法发送初始消息(如提示或命令),同时订阅该任务的更新。
  • 服务器能力: A2A服务器必须通过在其Agent Card中设置capabilities.streaming: true来表明其对流式传输的支持。
  • 服务器响应(连接): 如果订阅成功,服务器会返回HTTP 200 OK状态和Content-Type: text/event-stream。此HTTP连接保持打开状态以便服务器推送事件。
  • 事件结构: 服务器通过此流发送事件。每个事件的data字段包含一个JSON-RPC 2.0响应对象,特别是SendTaskStreamingResponse。此JSON-RPC响应中的id与客户端原始tasks/sendSubscribe请求中的id匹配。
  • 事件类型(在SendTaskStreamingResponse.result中):
    • TaskStatusUpdateEvent: 传达任务生命周期状态的变化(如从working变为input-requiredcompleted)。它还可以提供来自代理的中间消息(如"我正在分析数据...")。
    • TaskArtifactUpdateEvent: 传递任务生成的新或更新的Artifacts。这用于以块的形式流式传输大文件或数据结构。Artifact对象本身包含indexappendlastChunk等字段,以帮助客户端重新组装完整的工件。
  • 流终止: 服务器通过在一个TaskStatusUpdateEvent中设置final: true来发出特定交互周期(即当前tasks/sendSubscribe请求)更新结束的信号。这通常发生在任务达到终端状态(completedfailedcanceled)或input-required状态(服务器期望从客户端获得进一步输入)时。发送final: true事件后,服务器通常会关闭该特定请求的SSE连接。
  • 重新订阅: 如果客户端的SSE连接在任务仍处于活动状态时过早中断(且服务器尚未为该阶段发送final: true事件),客户端可以尝试使用tasks/resubscribe RPC方法重新连接到流。关于断开连接期间错过事件的行为(如是否回填或仅发送新更新)取决于具体实现。

何时使用流式传输:

  • 实时监控长时间运行任务的进度
  • 增量接收大结果(工件),允许在完整结果可用前开始处理
  • 交互式对话交换,即时反馈或部分响应有益时
  • 需要从代理获取低延迟更新的应用

参考协议规范了解详细结构:

2. 断开场景的推送通知

对于非常长时间运行的任务(如持续几分钟、几小时甚至几天)或当客户端不能或不愿保持持久连接时(如移动客户端或无服务器函数),A2A支持通过推送通知进行异步更新。此机制允许A2A服务器在发生重大任务更新时主动通知客户端提供的webhook。

关键特性:

  • 服务器能力: A2A服务器必须通过在其Agent Card中设置capabilities.pushNotifications: true来表明其对推送通知的支持。
  • 配置: 客户端向服务器提供PushNotificationConfig。此配置可以通过以下方式提供:
    • 在初始tasks/sendtasks/sendSubscribe请求中(通过TaskSendParams中的可选pushNotification参数)
    • 单独使用tasks/pushNotification/set RPC方法为现有任务设置 PushNotificationConfig包括:
    • url: A2A服务器应发送(POST)任务更新通知的绝对HTTPS webhook URL
    • token(可选): 客户端生成的不透明字符串(如密钥或任务特定标识符)。服务器应将此令牌包含在通知请求中(如在自定义头X-A2A-Notification-Token中)以供客户端的webhook接收器验证
    • authentication(可选): 一个AuthenticationInfo对象,指定A2A服务器应如何向客户端的webhook URL进行身份验证。客户端(webhook的接收方)定义这些身份验证要求
  • 通知触发: A2A服务器决定何时发送推送通知。通常,这发生在任务达到重大状态变化时,如转换到终端状态(completedfailedcanceled)或input-required状态,特别是在其关联消息和工件完全生成并稳定后
  • 通知负载: A2A协议本身并不严格定义服务器发送给客户端webhook的推送通知的HTTP主体负载。然而,通知应包含足够的信息供客户端识别taskId并理解更新的性质(如新的TaskState)。服务器可能发送最小负载(仅taskId和新状态)或更全面的负载(如摘要或完整的Task对象)
  • 客户端操作: 在收到推送通知(并成功验证其真实性和相关性)后,客户端通常使用带有通知中taskIdtasks/get RPC方法来检索完整的更新后的Task对象,包括任何新工件或详细消息

推送通知服务(客户端Webhook基础设施):

  • PushNotificationConfig.url中指定的目标url指向一个推送通知服务。此服务是客户端侧的一个组件(或客户端订阅的服务),负责接收来自A2A服务器的HTTP POST通知
  • 其职责包括:
    • 对传入通知进行身份验证(即验证其来自合法的A2A服务器)
    • 验证通知的相关性(如检查token)
    • 将通知或其内容中继到适当的客户端应用逻辑或系统
  • 在简单场景中(如本地开发),客户端应用本身可能直接暴露webhook端点
  • 在企业或生产环境中,这通常是一个健壮、安全的服务,处理传入webhook、验证调用者并路由消息(如到消息队列、内部API、移动推送通知网关或其他事件驱动系统)

推送通知的安全考虑

由于推送通知的异步和服务器发起的出站性质,安全性至关重要。发送通知的A2A服务器和客户端的webhook接收器都有责任。

A2A服务器安全(当向客户端Webhook发送通知时)

  1. Webhook URL验证:

    • 服务器不应盲目信任并发送POST请求到客户端在PushNotificationConfig中提供的任何url。恶意客户端可能提供指向内部服务或无关第三方系统的URL以造成危害(服务器端请求伪造-SSRF攻击)或充当分布式拒绝服务(DDoS)放大器
    • 缓解策略:
      • 允许列表: 如果可行,维护webhook URL的受信任域或IP范围的允许列表
      • 所有权验证/挑战-响应: 在发送实际通知前,服务器可以(理想情况下应该)执行验证步骤。例如,它可以向提议的webhook URL发出带有唯一validationToken(作为查询参数或头)的HTTP GETOPTIONS请求。webhook服务必须适当响应(如回显令牌或确认准备就绪)以证明所有权和可达性。A2A Python示例演示了一个简单的验证令牌检查机制
      • 网络控制: 使用出口防火墙或网络策略限制A2A服务器可以发送出站HTTP请求的位置
  2. 向客户端Webhook进行身份验证:

    • A2A服务器必须根据PushNotificationConfig.authentication中指定的方案向客户端的webhook URL进行身份验证
    • 服务器到服务器webhook的常见身份验证方案包括:
      • Bearer令牌(OAuth 2.0): A2A服务器获取访问令牌(如使用OAuth 2.0客户端凭据授权流,如果webhook提供商支持它)用于代表客户端webhook的受众/范围,并将其包含在通知POST请求的Authorization: Bearer <token>头中
      • API密钥: 预共享的API密钥,A2A服务器将其包含在特定HTTP头中(如X-Api-Key)
      • HMAC签名: A2A服务器使用共享密钥通过HMAC签署请求负载(或部分请求),并将签名包含在头中(如X-Hub-Signature)。webhook接收器然后验证此签名
      • 相互TLS(mTLS): 如果客户端的webhook基础设施支持,A2A服务器可以呈现客户端TLS证书

客户端Webhook接收器安全(当从A2A服务器接收通知时)

  1. 验证A2A服务器:

    • webhook端点必须严格验证传入通知请求的真实性,以确保它们源自合法的A2A服务器而非冒名顶替者
    • 验证签名/令牌:
      • 如果使用JWT(如作为Bearer令牌),根据A2A服务器的受信任公钥(如从A2A服务器提供的JWKS端点获取,如果适用)验证JWT的签名。同时验证声明如iss(签发者)、aud(受众-应标识您的webhook)、iat(签发时间)和exp(过期时间)
      • 如果使用HMAC签名,使用共享密钥在接收的负载上重新计算签名,并将其与请求头中的签名进行比较
      • 如果使用API密钥,确保密钥有效且已知
    • 验证PushNotificationConfig.token: 如果客户端在为其任务设置通知时在PushNotificationConfig中提供了不透明的token,webhook应检查传入通知是否包含此确切令牌(如在自定义头X-A2A-Notification-Token中)。这有助于确保通知是针对此特定客户端上下文和任务的,增加了一层授权
  2. 防止重放攻击:

    • 时间戳: 通知应理想情况下包含时间戳(如JWT中的iat-签发时间声明,或自定义时间戳头)。webhook应拒绝太旧的通知(如早于几分钟)以防止攻击者重放捕获的旧通知。时间戳应是签名负载的一部分(如果使用签名)以确保其完整性
    • Nonces/唯一ID: 对于关键通知,考虑为每个通知使用唯一的、一次性标识符(nonces或事件ID)。webhook应跟踪接收到的ID(在合理窗口内)以防止处理重复通知。JWT的jti(JWT ID)声明可服务于这一目的
  3. 安全密钥管理和轮换:

    • 如果使用加密密钥(HMAC的对称密钥,或JWT签名/mTLS的非对称密钥对),实施安全密钥管理实践,包括定期密钥轮换
    • 对于A2A服务器签名且客户端webhook验证的非对称密钥,像JWKS(JSON Web密钥集)这样的协议允许服务器发布其公钥(包括轮换期间的新密钥)到一个知名端点。客户端webhook然后可以动态获取正确的公钥进行签名验证,促进更顺畅的密钥轮换
示例非对称密钥流(JWT + JWKS)
  1. 客户端设置PushNotificationConfig,指定authentication.schemes: ["Bearer"]以及可能的JWT的预期issueraudience
  2. A2A服务器在发送通知时:
    • 生成JWT,用其私钥签名。JWT包含声明如iss(签发者)、aud(受众-webhook)、iat(签发时间)、exp(过期)、jti(JWT ID)和taskId
    • JWT头(algkid)指示签名算法和密钥ID
    • A2A服务器通过JWKS端点(webhook提供商可能知道或发现此端点的URL)使其公钥可用
  3. 客户端Webhook在收到通知时:
    • Authorization头提取JWT
    • 检查JWT头中的kid
    • 从A2A服务器的JWKS端点获取相应的公钥(建议缓存密钥)
    • 使用公钥验证JWT签名
    • 验证声明(issaudiatexpjti)
    • 检查PushNotificationConfig.token(如果提供)

这种全面、分层的推送通知安全方法确保了消息的真实性、完整性和及时性,同时保护了发送方A2A服务器和接收方客户端webhook基础设施。