# 任务队列后端 - 业务流程分析报告 ## 一、项目概述 **项目名称**: ComfyUI 任务队列后端 **核心功能**: 一个分布式任务处理系统,用于管理和分发 AI 图像生成任务到多个外部平台(如 comfyui、coze 等),支持任务队列、并发控制、状态跟踪和结果返回。 **技术栈**: - Node.js + Express - Redis(任务队列和数据存储) - WebSocket(实时通信) - Worker Threads(多线程处理) *** ## 二、系统架构 ### 2.1 核心组件 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 前端 / 后端客户端 │ └────────────────────────────┬────────────────────────────────────┘ │ WebSocket (8087) │ HTTP (8089 - 回调) ▼ ┌─────────────────────────────────────────────────────────────────┐ │ index.js (主入口) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ WebSocket │ │ HTTP Server │ │ Worker │ │ │ │ Server │ │ │ │ Manager │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ └─────────┼───────────────────┼───────────────────┼─────────────────┘ │ │ │ └───────────────────┴───────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Redis │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 等待队列 │ │ 处理队列 │ │ 结果队列 │ │ │ │ (Wait) │ │ (Process) │ │ (Result) │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 错误队列 │ │ 任务数据 │ │ 初始化配置 │ │ │ │ (Error) │ │ (Hash) │ │ (InitInfo) │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 2.2 Worker Threads 架构 系统使用 6 个独立的 Worker Threads 处理不同阶段的任务: | Worker 名称 | 功能描述 | 文件路径 | | ---------------- | ------------- | ------------------------------------------ | | assessment | 任务预处理、参数校验、入队 | `worker_threads/assessment/assessment.js` | | wait | 等待队列监控、任务分发 | `worker_threads/wait/waiting.js` | | polling | 轮询外部平台获取任务结果 | `worker_threads/process/process.js` | | result | 结果队列处理、返回给客户端 | `worker_threads/result/result.js` | | callback\_result | 回调结果处理 | `worker_threads/callback_result/result.js` | | error | 错误队列处理 | `worker_threads/error/error.js` | *** ## 三、完整业务流程 ### 3.1 流程总览图 ```mermaid flowchart TD Start([系统启动]) --> Init[初始化 Redis 队列] Init --> WS[WebSocket 服务启动] Init --> HTTP[HTTP 服务启动] WS --> ReceiveTask[接收任务: type=generate] ReceiveTask --> Assessment[Assessment Worker 处理] Assessment --> Validate{参数校验} Validate -->|失败| ErrorReturn[返回错误给客户端] Validate -->|成功| StoreTask[存储任务到 Redis Hash] StoreTask --> PushWait[推入等待队列] PushWait --> NotifySuccess[通知客户端: 任务提交成功] PushWait --> WaitWorker[Wait Worker 监控] WaitWorker --> CheckConcurrency{检查并发数} CheckConcurrency -->|并发未满| GetTasks[批量获取任务] GetTasks --> GenerateWorker[Generate Worker 分发] GenerateWorker --> ExternalPost[提交任务到外部平台] ExternalPost --> PostResult{提交结果} PostResult -->|失败| PushError[推入错误队列] PostResult -->|成功| CheckPlatform{平台类型} CheckPlatform -->|回调型| StoreCallback[存储 remoteTaskId 映射] CheckPlatform -->|轮询型| PushPolling[推入轮询队列] StoreCallback --> WaitCallback[等待外部回调] PushPolling --> PollingWorker[Polling Worker 轮询] WaitCallback --> CallbackReceived[收到回调] PollingWorker --> PollingResult{轮询结果} CallbackReceived --> ProcessCallback[处理回调数据] PollingResult -->|完成| StoreResult[存储结果] PollingResult -->|未完成| ContinuePolling[继续轮询] ProcessCallback --> StoreResult StoreResult --> PushResultQueue[推入结果队列] PushResultQueue --> ResultWorker[Result Worker 处理] ResultWorker --> SendResult[发送结果给客户端] PushError --> ErrorWorker[Error Worker 处理] ErrorWorker --> SendError[发送错误给客户端] ContinuePolling --> PollingWorker ``` ### 3.2 详细流程步骤 #### 阶段 1: 系统启动与初始化 **步骤 1.1: 启动主程序** - 入口文件: [index.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\index.js) - 加载环境变量 (.env) - 创建 6 个 Worker Threads **步骤 1.2: Redis 初始化** - 连接 Redis - 初始化队列配置 ([initQueue.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\redis\initQueue.js)) - 创建 `InitInfo` 配置对象,包含: - 等待队列列表 - 各平台并发配置 - 任务计数器 **步骤 1.3: 启动服务** - HTTP 服务器 (端口: 8087) - WebSocket 服务器 (同端口) - 回调服务器 (端口: 8089, [app.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\app.js)) *** #### 阶段 2: 任务接收与预处理 **步骤 2.1: 接收 WebSocket 任务** - 客户端通过 WebSocket 连接,携带 token 和 id - 验证 token (TOKEN\_SECRET) - 接收 `type: "generate"` 消息 **步骤 2.2: Assessment Worker 处理** - 文件: [assessment.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\assessment.js) - 内部线程池: 3 个 [PreproTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\PreproTask.js) **步骤 2.3: 参数校验** - 校验必填字段: `taskId`, `platform`, `payload` - 失败返回: `JSONError`, `OpcodeError` **步骤 2.4: 存储任务** - 创建任务 Hash: `{prefix}:task:{taskId}` - 字段: `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `workflowId` - 设置过期时间: 2 小时 (7200秒) **步骤 2.5: 推入等待队列** - 队列命名: `{AIGC}:{platform}:wait` - 例如: `digitalHuman-test:comfyui:wait` - 更新 `InitInfo.platforms.{key}.WQtasks` +1 **步骤 2.6: 通知客户端** - 返回成功消息: `"任务提交成功,正在排队中..."` *** #### 阶段 3: 等待队列监控与任务分发 **步骤 3.1: Wait Worker 监控** - 文件: [waiting.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\waiting.js) - 循环检查各平台等待队列 (间隔: 15秒) **步骤 3.2: 判断并发容量** - 获取 `InitInfo.platforms.{key}.PQtasks` (正在处理数) - 获取 `InitInfo.platforms.{key}.MAX_CONCURRENT` (最大并发数) - 对于 comfyui 平台,还会检查内部算力 (messageDispatcher) **步骤 3.3: 批量获取任务** - 从等待队列获取任务 ID (不超过剩余并发数) - 批量获取任务数据 (Hash) **步骤 3.4: 更新计数器** - `WQtasks` -N (减少等待数) - `PQtasks` +N (增加处理数) - `PQtasksALL` +N (总处理数) **步骤 3.5: 分发到 Generate Worker** - 文件: [GenerateWorkerManager.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateWorkerManager.js) - 内部线程池: [GenerateThreadPool.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateThreadPool.js) *** #### 阶段 4: 提交任务到外部平台 **步骤 4.1: Generate Worker 处理** - 文件: [generatTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\generatTask.js) - 批量调用 `externalPostRequest()` **步骤 4.2: 外部平台接口调用** - 文件: [generat.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\generat.js) - 平台适配器: [outside.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\outside.js) **支持的平台**: | 平台 | 类型 | 适配器文件 | | ------- | --- | -------------------------------------------------------------------------------------------------------------- | | comfyui | 回调型 | [comfyui.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\comfyui.js) | | coze | 轮询型 | [coze/coze.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\coze\coze.js) | | jimuai | - | [JimuAI.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\JimuAI.js) | **comfyui 分发策略**: - 优先使用 `messageDispatcher` (内部算力) - 失败降级到 `runninghub` (外部平台) **步骤 4.3: 处理提交结果** **情况 A: 提交失败** - 存储错误信息到任务 Hash - 推入错误队列: `{prefix}:error:list` - `EQtaskALL` +1 **情况 B: 提交成功 (回调型平台)** - 存储映射: `{prefix}:callback:{remoteTaskId}` → `taskId` - 更新任务 Hash: `remoteTaskId` **情况 C: 提交成功 (轮询型平台)** - 推入轮询队列: `{prefix}:processPolling:{AIGC}:{platform}` - Hash 结构: `{remoteTaskId}` → `{taskId, platform, AIGC, workflowId}` - 更新任务 Hash: `remoteTaskId` *** #### 阶段 5: 任务结果获取 **分支 A: 回调型平台 (comfyui)** **步骤 5A.1: 接收回调** - 回调接口: `POST /callback/all` ([callback.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\callback.js)) - 立即返回 200 响应 **步骤 5A.2: 处理回调数据** - 通过 `remoteTaskId` 查询 `taskId` - 存储 `eventData` 到任务 Hash 的 `resultData` * 推入回调队列: `{prefix}:callback` * `CQtasksALL` +1 **步骤 5A.3: Callback Result Worker 处理** - 文件: [callback\_result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\callback_result\result.js) - 发送结果给客户端 *** **分支 B: 轮询型平台 (coze)** **步骤 5B.1: Polling Worker 监控** - 文件: [process.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\process\process.js) - 为每个平台启动独立轮询循环 - 动态轮询间隔: 5-30秒 (根据任务数调整) **步骤 5B.2: 查询外部平台** - 文件: [polling.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\polling.js) - 调用 `externalGetRequest()` - 批量查询 (每批最多 100 个任务) **步骤 5B.3: 判断任务状态** **情况 1: 任务未完成** - 继续轮询 (不做任何操作) **情况 2: 任务成功** - 存储结果到任务 Hash 的 `resultData` - 更新状态: `status = "success"` - 推入结果队列: `{prefix}:result:list` - 从轮询队列删除 - `PQtasks` -1, `RQtasksALL` +1 **情况 3: 任务失败** - 存储错误信息到 `resultData` - 更新状态: `status = "failed"` - 推入错误队列: `{prefix}:error:list` - 从轮询队列删除 - `PQtasks` -1, `EQtaskALL` +1 *** #### 阶段 6: 结果返回给客户端 **分支 A: 结果队列处理** - 文件: [result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\result\result.js) - 从结果队列获取任务 - 通过 WebSocket 发送给对应 `backendId` 的客户端 - 从结果队列删除 - `RQtasksALL` -1 **分支 B: 错误队列处理** - 文件: [error/error.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\error\error.js) - 从错误队列获取任务 - 通过 WebSocket 发送错误给客户端 - 从错误队列和任务 Hash 删除 - `EQtaskALL` -1, `WQtasks` -1 *** ## 四、任务状态转换图 ```mermaid stateDiagram-v2 [*] --> pending: 任务创建 pending --> processing: 从等待队列取出 processing --> success: 任务成功完成 processing --> failed: 任务失败 success --> [*]: 结果已发送 failed --> [*]: 错误已发送 note right of pending 存储在等待队列 WQtasks +1 end note note right of processing 存储在处理队列/轮询队列 PQtasks +1 end note note right of success 存储在结果队列 RQtasksALL +1 end note note right of failed 存储在错误队列 EQtaskALL +1 end note ``` *** ## 五、Redis 数据结构 ### 5.1 队列列表 | Key 模式 | 类型 | 描述 | | ------------------------------------------- | ---- | ------------------------ | | `{prefix}:{AIGC}:{platform}:wait` | List | 等待队列,存储 taskId | | `{prefix}:process:Polling` | - | (已弃用,见下方) | | `{prefix}:processPolling:{AIGC}:{platform}` | Hash | 轮询队列,remoteTaskId → 任务信息 | | `{prefix}:result:queue` | - | (已弃用) | | `{prefix}:result:list` | List | 结果队列,存储 taskId | | `{prefix}:callback` | List | 回调队列,存储 taskId | | `{prefix}:error:queue` | - | (已弃用) | | `{prefix}:error:list` | List | 错误队列,存储 taskId | ### 5.2 任务数据 (Hash) | Key | 类型 | 字段 | | ------------------------ | ------ | ---------------------------------------------------------------------------------------------------------- | | `{prefix}:task:{taskId}` | Hash | `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `resultData`, `remoteTaskId`, `workflowId` | |
|
| **过期时间**: 7200秒 (2小时) | ### 5.3 映射关系 | Key | 类型 | 描述 | | --------------------------------------------------- | ------ | --------------------- | | `{prefix}:callback:{remoteTaskId}` | String | remoteTaskId → taskId | | `{prefix}:pending:messages` | List | 待发送消息键列表 | | `{prefix}:pending:messages:{backendId}:{timestamp}` | Hash | 待发送消息数据 | ### 5.4 初始化配置 (JSON) | Key | 路径 | 描述 | | ------------------- | ---------------------------------- | -------- | | `{prefix}:InitInfo` | `$.waitQueues` | 等待队列名称数组 | |
| `$.processPolling` | 轮询队列名称 | |
| `$.processCallback` | 回调队列名称 | |
| `$.resultName` | 结果队列名称 | |
| `$.PQtasksALL` | 总处理任务数 | |
| `$.RQtasksALL` | 总结果任务数 | |
| `$.CQtasksALL` | 总回调任务数 | |
| `$.EQtaskALL` | 总错误任务数 | |
| `$.platforms.{key}.WQtasks` | 平台等待任务数 | |
| `$.platforms.{key}.PQtasks` | 平台处理任务数 | |
| `$.platforms.{key}.MAX_CONCURRENT` | 平台最大并发数 | |
| `$.platforms.{key}.waitQueue` | 平台等待队列名 | *** ## 六、任务唯一标识与类型 ### 6.1 任务唯一标识 | 标识 | 生成位置 | 用途 | | -------------- | ------ | -------- | | `taskId` | 前端/客户端 | 内部任务唯一标识 | | `remoteTaskId` | 外部平台 | 外部平台任务标识 | | `backendId` | 客户端连接时 | 客户端连接标识 | ### 6.2 任务类型 | 类型 | 说明 | 处理方式 | | -------------- | ------- | ---------- | | 回调型 (callback) | comfyui | 等待外部回调通知 | | 轮询型 (polling) | coze | 主动轮询外部平台状态 | *** ## 七、关键配置文件 ### 7.1 model.json ```json { "digitalHuman-test": { "comfyui": { "apikey": "...", "concurrency": 13 }, "coze": { "apikey": "...", "concurrency": 20 } } } ``` ### 7.2 Platform.json ```json { "callback": ["comfyui"], "polling": ["coze"] } ``` ### 7.3 .env 环境变量 ```env PROJECT_PREFIX='digitalHuman-test' TOKEN_SECRET='...' WS_PORT=8087 CALLBACK_PORT=8089 RunningHub_URL='...' CALLBACK_URL='...' REDIS_URL='...' MESSAGE_DISPATCHER_URL='...' ``` *** ## 八、错误码与消息 ### 8.1 code.json ```json { "ERROR": { "JSONError": "消息格式错误,请联系服务商。", "OpcodeError": "错误提交,请稍后再试。", "BalanceError": "余额不足,请充值后继续使用。", "AssessmentError": "任务提交失败,请稍后再试。" }, "SUCCESS": { "AssessmentSuccess": "任务提交成功,正在排队中..." } } ``` *** ## 九、消息持久化机制 当客户端断开连接时,待发送消息会被保存到 Redis,待客户端重连后重试发送: 1. **保存待发送消息**: `messagePersistence.savePendingMessage()` 2. **客户端重连时获取**: `messagePersistence.getPendingMessages()` 3. **发送成功后删除**: `messagePersistence.removePendingMessage()` 4. **定期清理过期消息**: 超过 2 天的消息自动清理 *** ## 十、流程图 ### 10.1 完整数据流时序图 ```mermaid sequenceDiagram participant Client as 客户端 participant WS as WebSocket Server participant A as Assessment Worker participant Redis as Redis participant W as Wait Worker participant G as Generate Worker participant Ext as 外部平台 participant P as Polling Worker participant R as Result Worker Client->>WS: WebSocket 连接 (token, id) WS-->>Client: 连接成功 Client->>WS: {type: "generate", taskId, platform, payload} WS->>A: 转发任务 A->>A: 参数校验 alt 校验失败 A-->>WS: 返回错误 WS-->>Client: 错误消息 else 校验成功 A->>Redis: HSET {prefix}:task:{taskId} A->>Redis: RPUSH {AIGC}:{platform}:wait A->>Redis: INCR WQtasks A-->>WS: 成功 WS-->>Client: "任务提交成功,正在排队中..." end loop 每15秒检查 W->>Redis: GET InitInfo.platforms W->>Redis: LLEN wait queue alt 有可处理任务 W->>Redis: LRANGE wait queue (批量获取) W->>Redis: HGETALL task data W->>Redis: LTRIM wait queue W->>Redis: DECR WQtasks, INCR PQtasks W->>G: 发送任务数据 G->>Ext: POST /task/create Ext-->>G: {remoteTaskId} alt 提交失败 G->>Redis: HSET resultData (error) G->>Redis: LPUSH error:list G->>Redis: INCR EQtaskALL else 回调型平台 G->>Redis: SET callback:{remoteTaskId} = taskId G->>Redis: HSET remoteTaskId else 轮询型平台 G->>Redis: HSET processPolling:{remoteTaskId} G->>Redis: HSET remoteTaskId end end end alt 回调型平台 Ext->>WS: POST /callback/all WS->>Redis: GET callback:{remoteTaskId} WS->>Redis: HSET resultData WS->>Redis: LPUSH callback WS->>Redis: INCR CQtasksALL else 轮询型平台 loop 每5-30秒轮询 P->>Redis: HGETALL processPolling P->>Ext: GET /task/status alt 任务未完成 P->>P: 继续轮询 else 任务成功 P->>Redis: HSET resultData P->>Redis: LPUSH result:list P->>Redis: HDEL processPolling P->>Redis: DECR PQtasks, INCR RQtasksALL else 任务失败 P->>Redis: HSET resultData (error) P->>Redis: LPUSH error:list P->>Redis: HDEL processPolling P->>Redis: DECR PQtasks, INCR EQtaskALL end end end loop 每15秒检查 R->>Redis: LLEN result:list / error:list alt 有结果 R->>Redis: LRANGE result:list R->>Redis: HGETALL task data R->>WS: 发送结果 WS->>Client: {taskId, result} R->>Redis: LREM result:list R->>Redis: DECR RQtasksALL else 有错误 R->>Redis: LRANGE error:list R->>Redis: HGETALL task data R->>WS: 发送错误 WS->>Client: {taskId, error} R->>Redis: LREM error:list R->>Redis: DEL task:{taskId} R->>Redis: DECR EQtaskALL, DECR WQtasks end end ``` *** ## 十一、总结 该任务队列后端系统是一个设计完善的分布式任务处理系统,具有以下特点: 1. **多线程架构**: 使用 Worker Threads 实现各阶段解耦,提高并发处理能力 2. **Redis 作为核心**: 利用 Redis 的 List、Hash、JSON 等数据结构实现任务队列和状态管理 3. **支持多平台**: 可灵活接入不同类型的外部平台(回调型、轮询型) 4. **任务状态追踪**: 完整的任务生命周期管理,从接收、处理到完成/失败 5. **消息持久化**: 支持客户端断线重连后的消息补发 6. **并发控制**: 各平台独立的并发数配置,防止过载 7. **优雅降级**: comfyui 平台支持内部算力和外部平台的自动切换 系统的核心流程清晰,各组件职责明确,是一个生产级别的任务队列管理系统。