24 KiB
任务队列后端 - 业务流程分析报告
一、项目概述
项目名称: ComfyUI 任务队列后端
核心功能: 一个分布式任务处理系统,用于管理和分发 AI 图像生成任务到多个外部平台(如 comfyui、coze 等),支持任务队列、并发控制、状态跟踪和结果返回。
技术栈:
- Node.js + Express
- Redis(任务队列和数据存储)
- WebSocket(实时通信)
- Worker Threads(多线程处理)
二、系统架构
2.1 核心组件
┌─────────────────────────────────────────────────────────────────┐
│ 前端 / 后端客户端 │
└────────────────────────────┬────────────────────────────────────┘
│ WebSocket (8087)
│ HTTP (8089 - 回调)
▼
┌─────────────────────────────────────────────────────────────────┐
│ index.js (主入口) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WebSocket │ │ HTTP Server │ │ Worker │ │
│ │ Server │ │ │ │ Manager │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼───────────────────┼───────────────────┼─────────────────┘
│ │ │
└───────────────────┴───────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Redis │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 等待队列 │ │ 处理队列 │ │ 结果队列 │ │
│ │ (Wait) │ │ (Process) │ │ (Result) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 错误队列 │ │ 任务数据 │ │ 初始化配置 │ │
│ │ (Error) │ │ (Hash) │ │ (InitInfo) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
2.2 Worker Threads 架构
系统使用 6 个独立的 Worker Threads 处理不同阶段的任务:
| Worker 名称 | 功能描述 | 文件路径 |
|---|---|---|
| assessment | 任务预处理、参数校验、入队 | worker_threads/assessment/assessment.js |
| wait | 等待队列监控、任务分发 | worker_threads/wait/waiting.js |
| polling | 轮询外部平台获取任务结果 | worker_threads/process/process.js |
| result | 结果队列处理、返回给客户端 | worker_threads/result/result.js |
| callback_result | 回调结果处理 | worker_threads/callback_result/result.js |
| error | 错误队列处理 | worker_threads/error/error.js |
三、完整业务流程
3.1 流程总览图
flowchart TD
Start([系统启动]) --> Init[初始化 Redis 队列]
Init --> WS[WebSocket 服务启动]
Init --> HTTP[HTTP 服务启动]
WS --> ReceiveTask[接收任务: type=generate]
ReceiveTask --> Assessment[Assessment Worker 处理]
Assessment --> Validate{参数校验}
Validate -->|失败| ErrorReturn[返回错误给客户端]
Validate -->|成功| StoreTask[存储任务到 Redis Hash]
StoreTask --> PushWait[推入等待队列]
PushWait --> NotifySuccess[通知客户端: 任务提交成功]
PushWait --> WaitWorker[Wait Worker 监控]
WaitWorker --> CheckConcurrency{检查并发数}
CheckConcurrency -->|并发未满| GetTasks[批量获取任务]
GetTasks --> GenerateWorker[Generate Worker 分发]
GenerateWorker --> ExternalPost[提交任务到外部平台]
ExternalPost --> PostResult{提交结果}
PostResult -->|失败| PushError[推入错误队列]
PostResult -->|成功| CheckPlatform{平台类型}
CheckPlatform -->|回调型| StoreCallback[存储 remoteTaskId 映射]
CheckPlatform -->|轮询型| PushPolling[推入轮询队列]
StoreCallback --> WaitCallback[等待外部回调]
PushPolling --> PollingWorker[Polling Worker 轮询]
WaitCallback --> CallbackReceived[收到回调]
PollingWorker --> PollingResult{轮询结果}
CallbackReceived --> ProcessCallback[处理回调数据]
PollingResult -->|完成| StoreResult[存储结果]
PollingResult -->|未完成| ContinuePolling[继续轮询]
ProcessCallback --> StoreResult
StoreResult --> PushResultQueue[推入结果队列]
PushResultQueue --> ResultWorker[Result Worker 处理]
ResultWorker --> SendResult[发送结果给客户端]
PushError --> ErrorWorker[Error Worker 处理]
ErrorWorker --> SendError[发送错误给客户端]
ContinuePolling --> PollingWorker
3.2 详细流程步骤
阶段 1: 系统启动与初始化
步骤 1.1: 启动主程序
- 入口文件: index.js
- 加载环境变量 (.env)
- 创建 6 个 Worker Threads
步骤 1.2: Redis 初始化
- 连接 Redis
- 初始化队列配置 (initQueue.js)
- 创建
InitInfo配置对象,包含:- 等待队列列表
- 各平台并发配置
- 任务计数器
步骤 1.3: 启动服务
- HTTP 服务器 (端口: 8087)
- WebSocket 服务器 (同端口)
- 回调服务器 (端口: 8089, app.js)
阶段 2: 任务接收与预处理
步骤 2.1: 接收 WebSocket 任务
- 客户端通过 WebSocket 连接,携带 token 和 id
- 验证 token (TOKEN_SECRET)
- 接收
type: "generate"消息
步骤 2.2: Assessment Worker 处理
- 文件: assessment.js
- 内部线程池: 3 个 PreproTask.js
步骤 2.3: 参数校验
- 校验必填字段:
taskId,platform,payload - 失败返回:
JSONError,OpcodeError
步骤 2.4: 存储任务
- 创建任务 Hash:
{prefix}:task:{taskId} - 字段:
taskId,payload,backendId,AIGC,platform,status,workflowId - 设置过期时间: 2 小时 (7200秒)
步骤 2.5: 推入等待队列
- 队列命名:
{AIGC}:{platform}:wait - 例如:
digitalHuman-test:comfyui:wait - 更新
InitInfo.platforms.{key}.WQtasks+1
步骤 2.6: 通知客户端
- 返回成功消息:
"任务提交成功,正在排队中..."
阶段 3: 等待队列监控与任务分发
步骤 3.1: Wait Worker 监控
- 文件: waiting.js
- 循环检查各平台等待队列 (间隔: 15秒)
步骤 3.2: 判断并发容量
- 获取
InitInfo.platforms.{key}.PQtasks(正在处理数) - 获取
InitInfo.platforms.{key}.MAX_CONCURRENT(最大并发数) - 对于 comfyui 平台,还会检查内部算力 (messageDispatcher)
步骤 3.3: 批量获取任务
- 从等待队列获取任务 ID (不超过剩余并发数)
- 批量获取任务数据 (Hash)
步骤 3.4: 更新计数器
WQtasks-N (减少等待数)PQtasks+N (增加处理数)PQtasksALL+N (总处理数)
步骤 3.5: 分发到 Generate Worker
- 文件: GenerateWorkerManager.js
- 内部线程池: GenerateThreadPool.js
阶段 4: 提交任务到外部平台
步骤 4.1: Generate Worker 处理
- 文件: generatTask.js
- 批量调用
externalPostRequest()
步骤 4.2: 外部平台接口调用
- 文件: generat.js
- 平台适配器: outside.js
支持的平台:
| 平台 | 类型 | 适配器文件 |
|---|---|---|
| comfyui | 回调型 | comfyui.js |
| coze | 轮询型 | coze/coze.js |
| jimuai | - | JimuAI.js |
comfyui 分发策略:
- 优先使用
messageDispatcher(内部算力) - 失败降级到
runninghub(外部平台)
步骤 4.3: 处理提交结果
情况 A: 提交失败
- 存储错误信息到任务 Hash
- 推入错误队列:
{prefix}:error:list EQtaskALL+1
情况 B: 提交成功 (回调型平台)
- 存储映射:
{prefix}:callback:{remoteTaskId}→taskId - 更新任务 Hash:
remoteTaskId
情况 C: 提交成功 (轮询型平台)
- 推入轮询队列:
{prefix}:processPolling:{AIGC}:{platform} - Hash 结构:
{remoteTaskId}→{taskId, platform, AIGC, workflowId} - 更新任务 Hash:
remoteTaskId
阶段 5: 任务结果获取
分支 A: 回调型平台 (comfyui)
步骤 5A.1: 接收回调
- 回调接口:
POST /callback/all(callback.js) - 立即返回 200 响应
步骤 5A.2: 处理回调数据
- 通过
remoteTaskId查询taskId - 存储
eventData到任务 Hash 的resultData
- 推入回调队列:
{prefix}:callback CQtasksALL+1
步骤 5A.3: Callback Result Worker 处理
- 文件: callback_result/result.js
- 发送结果给客户端
分支 B: 轮询型平台 (coze)
步骤 5B.1: Polling Worker 监控
- 文件: process.js
- 为每个平台启动独立轮询循环
- 动态轮询间隔: 5-30秒 (根据任务数调整)
步骤 5B.2: 查询外部平台
- 文件: polling.js
- 调用
externalGetRequest() - 批量查询 (每批最多 100 个任务)
步骤 5B.3: 判断任务状态
情况 1: 任务未完成
- 继续轮询 (不做任何操作)
情况 2: 任务成功
- 存储结果到任务 Hash 的
resultData - 更新状态:
status = "success" - 推入结果队列:
{prefix}:result:list - 从轮询队列删除
PQtasks-1,RQtasksALL+1
情况 3: 任务失败
- 存储错误信息到
resultData - 更新状态:
status = "failed" - 推入错误队列:
{prefix}:error:list - 从轮询队列删除
PQtasks-1,EQtaskALL+1
阶段 6: 结果返回给客户端
分支 A: 结果队列处理
- 文件: result/result.js
- 从结果队列获取任务
- 通过 WebSocket 发送给对应
backendId的客户端 - 从结果队列删除
RQtasksALL-1
分支 B: 错误队列处理
- 文件: error/error.js
- 从错误队列获取任务
- 通过 WebSocket 发送错误给客户端
- 从错误队列和任务 Hash 删除
EQtaskALL-1,WQtasks-1
四、任务状态转换图
stateDiagram-v2
[*] --> pending: 任务创建
pending --> processing: 从等待队列取出
processing --> success: 任务成功完成
processing --> failed: 任务失败
success --> [*]: 结果已发送
failed --> [*]: 错误已发送
note right of pending
存储在等待队列
WQtasks +1
end note
note right of processing
存储在处理队列/轮询队列
PQtasks +1
end note
note right of success
存储在结果队列
RQtasksALL +1
end note
note right of failed
存储在错误队列
EQtaskALL +1
end note
五、Redis 数据结构
5.1 队列列表
| Key 模式 | 类型 | 描述 |
|---|---|---|
{prefix}:{AIGC}:{platform}:wait |
List | 等待队列,存储 taskId |
{prefix}:process:Polling |
- | (已弃用,见下方) |
{prefix}:processPolling:{AIGC}:{platform} |
Hash | 轮询队列,remoteTaskId → 任务信息 |
{prefix}:result:queue |
- | (已弃用) |
{prefix}:result:list |
List | 结果队列,存储 taskId |
{prefix}:callback |
List | 回调队列,存储 taskId |
{prefix}:error:queue |
- | (已弃用) |
{prefix}:error:list |
List | 错误队列,存储 taskId |
5.2 任务数据 (Hash)
| Key | 类型 | 字段 |
|---|---|---|
{prefix}:task:{taskId} |
Hash | taskId, payload, backendId, AIGC, platform, status, resultData, remoteTaskId, workflowId |
| 过期时间: 7200秒 (2小时) |
5.3 映射关系
| Key | 类型 | 描述 |
|---|---|---|
{prefix}:callback:{remoteTaskId} |
String | remoteTaskId → taskId |
{prefix}:pending:messages |
List | 待发送消息键列表 |
{prefix}:pending:messages:{backendId}:{timestamp} |
Hash | 待发送消息数据 |
5.4 初始化配置 (JSON)
| Key | 路径 | 描述 |
|---|---|---|
{prefix}:InitInfo |
$.waitQueues |
等待队列名称数组 |
$.processPolling |
轮询队列名称 | |
$.processCallback |
回调队列名称 | |
$.resultName |
结果队列名称 | |
$.PQtasksALL |
总处理任务数 | |
$.RQtasksALL |
总结果任务数 | |
$.CQtasksALL |
总回调任务数 | |
$.EQtaskALL |
总错误任务数 | |
$.platforms.{key}.WQtasks |
平台等待任务数 | |
$.platforms.{key}.PQtasks |
平台处理任务数 | |
$.platforms.{key}.MAX_CONCURRENT |
平台最大并发数 | |
$.platforms.{key}.waitQueue |
平台等待队列名 |
六、任务唯一标识与类型
6.1 任务唯一标识
| 标识 | 生成位置 | 用途 |
|---|---|---|
taskId |
前端/客户端 | 内部任务唯一标识 |
remoteTaskId |
外部平台 | 外部平台任务标识 |
backendId |
客户端连接时 | 客户端连接标识 |
6.2 任务类型
| 类型 | 说明 | 处理方式 |
|---|---|---|
| 回调型 (callback) | comfyui | 等待外部回调通知 |
| 轮询型 (polling) | coze | 主动轮询外部平台状态 |
七、关键配置文件
7.1 model.json
{
"digitalHuman-test": {
"comfyui": {
"apikey": "...",
"concurrency": 13
},
"coze": {
"apikey": "...",
"concurrency": 20
}
}
}
7.2 Platform.json
{
"callback": ["comfyui"],
"polling": ["coze"]
}
7.3 .env 环境变量
PROJECT_PREFIX='digitalHuman-test'
TOKEN_SECRET='...'
WS_PORT=8087
CALLBACK_PORT=8089
RunningHub_URL='...'
CALLBACK_URL='...'
REDIS_URL='...'
MESSAGE_DISPATCHER_URL='...'
八、错误码与消息
8.1 code.json
{
"ERROR": {
"JSONError": "消息格式错误,请联系服务商。",
"OpcodeError": "错误提交,请稍后再试。",
"BalanceError": "余额不足,请充值后继续使用。",
"AssessmentError": "任务提交失败,请稍后再试。"
},
"SUCCESS": {
"AssessmentSuccess": "任务提交成功,正在排队中..."
}
}
九、消息持久化机制
当客户端断开连接时,待发送消息会被保存到 Redis,待客户端重连后重试发送:
- 保存待发送消息:
messagePersistence.savePendingMessage() - 客户端重连时获取:
messagePersistence.getPendingMessages() - 发送成功后删除:
messagePersistence.removePendingMessage() - 定期清理过期消息: 超过 2 天的消息自动清理
十、流程图
10.1 完整数据流时序图
sequenceDiagram
participant Client as 客户端
participant WS as WebSocket Server
participant A as Assessment Worker
participant Redis as Redis
participant W as Wait Worker
participant G as Generate Worker
participant Ext as 外部平台
participant P as Polling Worker
participant R as Result Worker
Client->>WS: WebSocket 连接 (token, id)
WS-->>Client: 连接成功
Client->>WS: {type: "generate", taskId, platform, payload}
WS->>A: 转发任务
A->>A: 参数校验
alt 校验失败
A-->>WS: 返回错误
WS-->>Client: 错误消息
else 校验成功
A->>Redis: HSET {prefix}:task:{taskId}
A->>Redis: RPUSH {AIGC}:{platform}:wait
A->>Redis: INCR WQtasks
A-->>WS: 成功
WS-->>Client: "任务提交成功,正在排队中..."
end
loop 每15秒检查
W->>Redis: GET InitInfo.platforms
W->>Redis: LLEN wait queue
alt 有可处理任务
W->>Redis: LRANGE wait queue (批量获取)
W->>Redis: HGETALL task data
W->>Redis: LTRIM wait queue
W->>Redis: DECR WQtasks, INCR PQtasks
W->>G: 发送任务数据
G->>Ext: POST /task/create
Ext-->>G: {remoteTaskId}
alt 提交失败
G->>Redis: HSET resultData (error)
G->>Redis: LPUSH error:list
G->>Redis: INCR EQtaskALL
else 回调型平台
G->>Redis: SET callback:{remoteTaskId} = taskId
G->>Redis: HSET remoteTaskId
else 轮询型平台
G->>Redis: HSET processPolling:{remoteTaskId}
G->>Redis: HSET remoteTaskId
end
end
end
alt 回调型平台
Ext->>WS: POST /callback/all
WS->>Redis: GET callback:{remoteTaskId}
WS->>Redis: HSET resultData
WS->>Redis: LPUSH callback
WS->>Redis: INCR CQtasksALL
else 轮询型平台
loop 每5-30秒轮询
P->>Redis: HGETALL processPolling
P->>Ext: GET /task/status
alt 任务未完成
P->>P: 继续轮询
else 任务成功
P->>Redis: HSET resultData
P->>Redis: LPUSH result:list
P->>Redis: HDEL processPolling
P->>Redis: DECR PQtasks, INCR RQtasksALL
else 任务失败
P->>Redis: HSET resultData (error)
P->>Redis: LPUSH error:list
P->>Redis: HDEL processPolling
P->>Redis: DECR PQtasks, INCR EQtaskALL
end
end
end
loop 每15秒检查
R->>Redis: LLEN result:list / error:list
alt 有结果
R->>Redis: LRANGE result:list
R->>Redis: HGETALL task data
R->>WS: 发送结果
WS->>Client: {taskId, result}
R->>Redis: LREM result:list
R->>Redis: DECR RQtasksALL
else 有错误
R->>Redis: LRANGE error:list
R->>Redis: HGETALL task data
R->>WS: 发送错误
WS->>Client: {taskId, error}
R->>Redis: LREM error:list
R->>Redis: DEL task:{taskId}
R->>Redis: DECR EQtaskALL, DECR WQtasks
end
end
十一、总结
该任务队列后端系统是一个设计完善的分布式任务处理系统,具有以下特点:
- 多线程架构: 使用 Worker Threads 实现各阶段解耦,提高并发处理能力
- Redis 作为核心: 利用 Redis 的 List、Hash、JSON 等数据结构实现任务队列和状态管理
- 支持多平台: 可灵活接入不同类型的外部平台(回调型、轮询型)
- 任务状态追踪: 完整的任务生命周期管理,从接收、处理到完成/失败
- 消息持久化: 支持客户端断线重连后的消息补发
- 并发控制: 各平台独立的并发数配置,防止过载
- 优雅降级: comfyui 平台支持内部算力和外部平台的自动切换
系统的核心流程清晰,各组件职责明确,是一个生产级别的任务队列管理系统。