7.4 KiB
7.4 KiB
任务队列后端改造提示词
概述
你需要改造现有的任务队列后端,使其能够与 ComfyUI Cluster Bridge(桥接器)建立并维持可靠的双向通信。桥接器位于任务队列和 ComfyUI 后端之间,负责任务的转发和结果的回调。
通信架构
客户端 → 任务队列后端 ←(WebSocket)→ 桥接器 → ComfyUI后端
↑_______________________________(Webhook回调)_____|
核心要求
1. WebSocket 服务器
任务队列后端需要实现一个 WebSocket 服务器,桥接器会主动连接到该服务器。
连接地址配置
- WebSocket 路径建议:
/ws - 桥接器可通过后台管理界面修改连接地址
- 默认配置:
ws://localhost:8080/ws
连接生命周期
- 桥接器启动时会主动连接
- 连接断开后,桥接器每分钟尝试重连一次,直至成功
- 任务队列后端应支持多个桥接器同时连接
2. 消息协议
消息通用格式
所有消息采用 JSON 格式,结构如下:
{
"type": "消息类型",
"data": { ... }
}
桥接器 → 任务队列(桥接器发送的消息)
REGISTER - 注册消息
桥接器连接成功后立即发送:
{
"type": "REGISTER",
"data": {
"bridgeId": "uuid字符串",
"instanceCount": 8,
"availableInstanceCount": 5,
"timestamp": "2024-01-01T00:00:00.000Z"
}
}
HEARTBEAT - 心跳消息
桥接器定期发送(建议每30秒一次):
{
"type": "HEARTBEAT",
"data": {
"instanceCount": 8,
"availableInstanceCount": 5,
"busyInstanceCount": 3,
"timestamp": "2024-01-01T00:00:00.000Z"
}
}
TASK_ACK - 任务确认消息
桥接器收到任务后立即发送:
{
"type": "TASK_ACK",
"data": {
"code": 0,
"msg": "success",
"data": {
"taskId": "1910246754753896450",
"taskStatus": "RUNNING"
}
}
}
PONG - 心跳响应
响应任务队列的 PING 消息:
{
"type": "PONG"
}
任务队列 → 桥接器(任务队列发送的消息)
TASK_ASSIGN - 任务分配消息
{
"type": "TASK_ASSIGN",
"data": {
"workflowId": "1904136902449209346",
"nodeInfoList": [
{
"nodeId": "6",
"fieldName": "text",
"fieldValue": "1 girl in classroom"
},
{
"nodeId": "3",
"fieldName": "seed",
"fieldValue": "1231231"
}
],
"webhookUrl": "https://shuzhiren.xueai.art/callback"
}
}
PING - 心跳检测
任务队列可定期发送:
{
"type": "PING"
}
3. Webhook 回调接口
任务队列后端需要提供一个 HTTP POST 接口用于接收任务完成回调。
回调请求格式
POST {webhookUrl}
Content-Type: application/json
{
"event": "TASK_END",
"taskId": "1910246754753896450",
"eventData": "{\"code\":0,\"msg\":\"success\",\"data\":[{\"fileUrl\":\"https://example.com/image.png\",\"fileType\":\"png\",\"taskCostTime\":0,\"nodeId\":\"9\"}]}"
}
eventData 字段说明
eventData 是 JSON 字符串,解析后结构如下:
成功时:
{
"code": 0,
"msg": "success",
"data": [
{
"fileUrl": "https://example.com/output.png",
"fileType": "png",
"taskCostTime": 0,
"nodeId": "9"
}
]
}
失败时:
{
"code": 1,
"msg": "错误信息",
"data": []
}
4. 任务调度逻辑
任务队列应根据桥接器的可用实例数量来分配任务:
- 桥接器注册时:记录该桥接器的总实例数和可用实例数
- 心跳更新时:更新桥接器的实例状态
- 任务分配时:
- 选择有可用实例的桥接器
- 根据
availableInstanceCount决定发送多少任务 - 发送任务后,暂减该桥接器的可用计数
- 收到 TASK_ACK 或 TASK_END 回调后更新状态
5. 错误处理
WebSocket 连接错误
- 桥接器断开连接后,任务队列应:
- 标记该桥接器为离线
- 停止向其发送新任务
- 已发送但未完成的任务根据需要重新分配
任务超时
- 任务队列应设置任务超时机制
- 超时任务可标记为失败或尝试重新分配
- 超时时间建议:5-10分钟
Webhook 回调失败
- 桥接器会尝试发送回调,但可能失败
- 任务队列应考虑实现回调重试机制
- 或提供查询任务状态的接口供桥接器轮询
实施步骤
第一步:实现 WebSocket 服务器
- 创建 WebSocket 服务器,监听指定端口
- 实现连接管理,支持多个桥接器同时连接
- 实现消息解析和分发
第二步:实现注册和心跳机制
- 处理 REGISTER 消息,记录桥接器信息
- 处理 HEARTBEAT 消息,更新桥接器状态
- 实现桥接器离线检测(如超过3个心跳周期未收到消息)
第三步:实现任务分配
- 创建任务队列存储
- 实现任务分配逻辑,根据可用实例选择桥接器
- 发送 TASK_ASSIGN 消息
- 处理 TASK_ACK 确认消息
第四步:实现 Webhook 回调接口
- 创建 HTTP POST 接口接收回调
- 解析 eventData 字段
- 更新任务状态
- 触发后续业务逻辑
第五步:实现错误处理和重试
- 实现桥接器离线后的任务重分配
- 实现任务超时机制
- 添加监控和日志
API 接口定义(任务队列后端)
WebSocket 端点
| 路径 | 方法 | 说明 |
|---|---|---|
/ws |
WebSocket | 桥接器连接端点 |
HTTP 端点
| 路径 | 方法 | 说明 |
|---|---|---|
/callback |
POST | 接收任务完成回调 |
消息结构速查表
任务队列发送的消息
| 类型 | 说明 | 触发时机 |
|---|---|---|
| TASK_ASSIGN | 分配任务 | 有新任务需要处理时 |
| PING | 心跳检测 | 定期发送 |
桥接器发送的消息
| 类型 | 说明 | 触发时机 |
|---|---|---|
| REGISTER | 注册桥接器 | 连接成功后立即 |
| HEARTBEAT | 心跳 | 定期发送(30秒) |
| TASK_ACK | 任务确认 | 收到任务后立即 |
| PONG | 心跳响应 | 收到 PING 后 |
集成测试流程
测试 1:连接和注册
- 启动任务队列后端
- 启动桥接器
- 验证桥接器成功连接并发送 REGISTER 消息
- 验证任务队列正确记录桥接器信息
测试 2:任务分配和回调
- 通过任务队列提交一个测试任务
- 验证任务被分配到桥接器(TASK_ASSIGN)
- 验证桥接器发送 TASK_ACK
- 等待任务完成
- 验证收到 Webhook 回调
- 验证回调数据格式正确
测试 3:断开重连
- 建立连接后,断开桥接器
- 验证任务队列检测到离线
- 等待1分钟,验证桥接器自动重连
- 验证重连后正常工作
测试 4:多实例负载
- 配置桥接器有多个 ComfyUI 实例
- 同时提交多个任务
- 验证任务按实例数量分配
- 验证所有任务正常完成
注意事项
- webhookUrl 的传递:任务队列必须在 TASK_ASSIGN 消息中提供
webhookUrl,桥接器通过该 URL 发送回调 - taskId 的对应:TASK_ACK 和回调中的 taskId 应与任务队列中的任务标识对应
- eventData 的格式:eventData 是 JSON 字符串,需要先解析再使用
- 可用性监控:任务队列应监控桥接器的在线状态,避免向离线桥接器发送任务
- 幂等性:考虑实现回调接口的幂等性,防止重复处理