# 任务队列后端 - 业务流程分析报告
## 一、项目概述
**项目名称**: ComfyUI 任务队列后端
**核心功能**: 一个分布式任务处理系统,用于管理和分发 AI 图像生成任务到多个外部平台(如 comfyui、coze 等),支持任务队列、并发控制、状态跟踪和结果返回。
**技术栈**:
- Node.js + Express
- Redis(任务队列和数据存储)
- WebSocket(实时通信)
- Worker Threads(多线程处理)
***
## 二、系统架构
### 2.1 核心组件
```
┌─────────────────────────────────────────────────────────────────┐
│ 前端 / 后端客户端 │
└────────────────────────────┬────────────────────────────────────┘
│ WebSocket (8087)
│ HTTP (8089 - 回调)
▼
┌─────────────────────────────────────────────────────────────────┐
│ index.js (主入口) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WebSocket │ │ HTTP Server │ │ Worker │ │
│ │ Server │ │ │ │ Manager │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼───────────────────┼───────────────────┼─────────────────┘
│ │ │
└───────────────────┴───────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Redis │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 等待队列 │ │ 处理队列 │ │ 结果队列 │ │
│ │ (Wait) │ │ (Process) │ │ (Result) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 错误队列 │ │ 任务数据 │ │ 初始化配置 │ │
│ │ (Error) │ │ (Hash) │ │ (InitInfo) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 2.2 Worker Threads 架构
系统使用 6 个独立的 Worker Threads 处理不同阶段的任务:
| Worker 名称 | 功能描述 | 文件路径 |
| ---------------- | ------------- | ------------------------------------------ |
| assessment | 任务预处理、参数校验、入队 | `worker_threads/assessment/assessment.js` |
| wait | 等待队列监控、任务分发 | `worker_threads/wait/waiting.js` |
| polling | 轮询外部平台获取任务结果 | `worker_threads/process/process.js` |
| result | 结果队列处理、返回给客户端 | `worker_threads/result/result.js` |
| callback\_result | 回调结果处理 | `worker_threads/callback_result/result.js` |
| error | 错误队列处理 | `worker_threads/error/error.js` |
***
## 三、完整业务流程
### 3.1 流程总览图
```mermaid
flowchart TD
Start([系统启动]) --> Init[初始化 Redis 队列]
Init --> WS[WebSocket 服务启动]
Init --> HTTP[HTTP 服务启动]
WS --> ReceiveTask[接收任务: type=generate]
ReceiveTask --> Assessment[Assessment Worker 处理]
Assessment --> Validate{参数校验}
Validate -->|失败| ErrorReturn[返回错误给客户端]
Validate -->|成功| StoreTask[存储任务到 Redis Hash]
StoreTask --> PushWait[推入等待队列]
PushWait --> NotifySuccess[通知客户端: 任务提交成功]
PushWait --> WaitWorker[Wait Worker 监控]
WaitWorker --> CheckConcurrency{检查并发数}
CheckConcurrency -->|并发未满| GetTasks[批量获取任务]
GetTasks --> GenerateWorker[Generate Worker 分发]
GenerateWorker --> ExternalPost[提交任务到外部平台]
ExternalPost --> PostResult{提交结果}
PostResult -->|失败| PushError[推入错误队列]
PostResult -->|成功| CheckPlatform{平台类型}
CheckPlatform -->|回调型| StoreCallback[存储 remoteTaskId 映射]
CheckPlatform -->|轮询型| PushPolling[推入轮询队列]
StoreCallback --> WaitCallback[等待外部回调]
PushPolling --> PollingWorker[Polling Worker 轮询]
WaitCallback --> CallbackReceived[收到回调]
PollingWorker --> PollingResult{轮询结果}
CallbackReceived --> ProcessCallback[处理回调数据]
PollingResult -->|完成| StoreResult[存储结果]
PollingResult -->|未完成| ContinuePolling[继续轮询]
ProcessCallback --> StoreResult
StoreResult --> PushResultQueue[推入结果队列]
PushResultQueue --> ResultWorker[Result Worker 处理]
ResultWorker --> SendResult[发送结果给客户端]
PushError --> ErrorWorker[Error Worker 处理]
ErrorWorker --> SendError[发送错误给客户端]
ContinuePolling --> PollingWorker
```
### 3.2 详细流程步骤
#### 阶段 1: 系统启动与初始化
**步骤 1.1: 启动主程序**
- 入口文件: [index.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\index.js)
- 加载环境变量 (.env)
- 创建 6 个 Worker Threads
**步骤 1.2: Redis 初始化**
- 连接 Redis
- 初始化队列配置 ([initQueue.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\redis\initQueue.js))
- 创建 `InitInfo` 配置对象,包含:
- 等待队列列表
- 各平台并发配置
- 任务计数器
**步骤 1.3: 启动服务**
- HTTP 服务器 (端口: 8087)
- WebSocket 服务器 (同端口)
- 回调服务器 (端口: 8089, [app.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\app.js))
***
#### 阶段 2: 任务接收与预处理
**步骤 2.1: 接收 WebSocket 任务**
- 客户端通过 WebSocket 连接,携带 token 和 id
- 验证 token (TOKEN\_SECRET)
- 接收 `type: "generate"` 消息
**步骤 2.2: Assessment Worker 处理**
- 文件: [assessment.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\assessment.js)
- 内部线程池: 3 个 [PreproTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\PreproTask.js)
**步骤 2.3: 参数校验**
- 校验必填字段: `taskId`, `platform`, `payload`
- 失败返回: `JSONError`, `OpcodeError`
**步骤 2.4: 存储任务**
- 创建任务 Hash: `{prefix}:task:{taskId}`
- 字段: `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `workflowId`
- 设置过期时间: 2 小时 (7200秒)
**步骤 2.5: 推入等待队列**
- 队列命名: `{AIGC}:{platform}:wait`
- 例如: `digitalHuman-test:comfyui:wait`
- 更新 `InitInfo.platforms.{key}.WQtasks` +1
**步骤 2.6: 通知客户端**
- 返回成功消息: `"任务提交成功,正在排队中..."`
***
#### 阶段 3: 等待队列监控与任务分发
**步骤 3.1: Wait Worker 监控**
- 文件: [waiting.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\waiting.js)
- 循环检查各平台等待队列 (间隔: 15秒)
**步骤 3.2: 判断并发容量**
- 获取 `InitInfo.platforms.{key}.PQtasks` (正在处理数)
- 获取 `InitInfo.platforms.{key}.MAX_CONCURRENT` (最大并发数)
- 对于 comfyui 平台,还会检查内部算力 (messageDispatcher)
**步骤 3.3: 批量获取任务**
- 从等待队列获取任务 ID (不超过剩余并发数)
- 批量获取任务数据 (Hash)
**步骤 3.4: 更新计数器**
- `WQtasks` -N (减少等待数)
- `PQtasks` +N (增加处理数)
- `PQtasksALL` +N (总处理数)
**步骤 3.5: 分发到 Generate Worker**
- 文件: [GenerateWorkerManager.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateWorkerManager.js)
- 内部线程池: [GenerateThreadPool.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateThreadPool.js)
***
#### 阶段 4: 提交任务到外部平台
**步骤 4.1: Generate Worker 处理**
- 文件: [generatTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\generatTask.js)
- 批量调用 `externalPostRequest()`
**步骤 4.2: 外部平台接口调用**
- 文件: [generat.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\generat.js)
- 平台适配器: [outside.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\outside.js)
**支持的平台**:
| 平台 | 类型 | 适配器文件 |
| ------- | --- | -------------------------------------------------------------------------------------------------------------- |
| comfyui | 回调型 | [comfyui.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\comfyui.js) |
| coze | 轮询型 | [coze/coze.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\coze\coze.js) |
| jimuai | - | [JimuAI.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\JimuAI.js) |
**comfyui 分发策略**:
- 优先使用 `messageDispatcher` (内部算力)
- 失败降级到 `runninghub` (外部平台)
**步骤 4.3: 处理提交结果**
**情况 A: 提交失败**
- 存储错误信息到任务 Hash
- 推入错误队列: `{prefix}:error:list`
- `EQtaskALL` +1
**情况 B: 提交成功 (回调型平台)**
- 存储映射: `{prefix}:callback:{remoteTaskId}` → `taskId`
- 更新任务 Hash: `remoteTaskId`
**情况 C: 提交成功 (轮询型平台)**
- 推入轮询队列: `{prefix}:processPolling:{AIGC}:{platform}`
- Hash 结构: `{remoteTaskId}` → `{taskId, platform, AIGC, workflowId}`
- 更新任务 Hash: `remoteTaskId`
***
#### 阶段 5: 任务结果获取
**分支 A: 回调型平台 (comfyui)**
**步骤 5A.1: 接收回调**
- 回调接口: `POST /callback/all` ([callback.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\callback.js))
- 立即返回 200 响应
**步骤 5A.2: 处理回调数据**
- 通过 `remoteTaskId` 查询 `taskId`
- 存储 `eventData` 到任务 Hash 的 `resultData`
* 推入回调队列: `{prefix}:callback`
* `CQtasksALL` +1
**步骤 5A.3: Callback Result Worker 处理**
- 文件: [callback\_result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\callback_result\result.js)
- 发送结果给客户端
***
**分支 B: 轮询型平台 (coze)**
**步骤 5B.1: Polling Worker 监控**
- 文件: [process.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\process\process.js)
- 为每个平台启动独立轮询循环
- 动态轮询间隔: 5-30秒 (根据任务数调整)
**步骤 5B.2: 查询外部平台**
- 文件: [polling.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\polling.js)
- 调用 `externalGetRequest()`
- 批量查询 (每批最多 100 个任务)
**步骤 5B.3: 判断任务状态**
**情况 1: 任务未完成**
- 继续轮询 (不做任何操作)
**情况 2: 任务成功**
- 存储结果到任务 Hash 的 `resultData`
- 更新状态: `status = "success"`
- 推入结果队列: `{prefix}:result:list`
- 从轮询队列删除
- `PQtasks` -1, `RQtasksALL` +1
**情况 3: 任务失败**
- 存储错误信息到 `resultData`
- 更新状态: `status = "failed"`
- 推入错误队列: `{prefix}:error:list`
- 从轮询队列删除
- `PQtasks` -1, `EQtaskALL` +1
***
#### 阶段 6: 结果返回给客户端
**分支 A: 结果队列处理**
- 文件: [result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\result\result.js)
- 从结果队列获取任务
- 通过 WebSocket 发送给对应 `backendId` 的客户端
- 从结果队列删除
- `RQtasksALL` -1
**分支 B: 错误队列处理**
- 文件: [error/error.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\error\error.js)
- 从错误队列获取任务
- 通过 WebSocket 发送错误给客户端
- 从错误队列和任务 Hash 删除
- `EQtaskALL` -1, `WQtasks` -1
***
## 四、任务状态转换图
```mermaid
stateDiagram-v2
[*] --> pending: 任务创建
pending --> processing: 从等待队列取出
processing --> success: 任务成功完成
processing --> failed: 任务失败
success --> [*]: 结果已发送
failed --> [*]: 错误已发送
note right of pending
存储在等待队列
WQtasks +1
end note
note right of processing
存储在处理队列/轮询队列
PQtasks +1
end note
note right of success
存储在结果队列
RQtasksALL +1
end note
note right of failed
存储在错误队列
EQtaskALL +1
end note
```
***
## 五、Redis 数据结构
### 5.1 队列列表
| Key 模式 | 类型 | 描述 |
| ------------------------------------------- | ---- | ------------------------ |
| `{prefix}:{AIGC}:{platform}:wait` | List | 等待队列,存储 taskId |
| `{prefix}:process:Polling` | - | (已弃用,见下方) |
| `{prefix}:processPolling:{AIGC}:{platform}` | Hash | 轮询队列,remoteTaskId → 任务信息 |
| `{prefix}:result:queue` | - | (已弃用) |
| `{prefix}:result:list` | List | 结果队列,存储 taskId |
| `{prefix}:callback` | List | 回调队列,存储 taskId |
| `{prefix}:error:queue` | - | (已弃用) |
| `{prefix}:error:list` | List | 错误队列,存储 taskId |
### 5.2 任务数据 (Hash)
| Key | 类型 | 字段 |
| ------------------------ | ------ | ---------------------------------------------------------------------------------------------------------- |
| `{prefix}:task:{taskId}` | Hash | `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `resultData`, `remoteTaskId`, `workflowId` |
|
|
| **过期时间**: 7200秒 (2小时) |
### 5.3 映射关系
| Key | 类型 | 描述 |
| --------------------------------------------------- | ------ | --------------------- |
| `{prefix}:callback:{remoteTaskId}` | String | remoteTaskId → taskId |
| `{prefix}:pending:messages` | List | 待发送消息键列表 |
| `{prefix}:pending:messages:{backendId}:{timestamp}` | Hash | 待发送消息数据 |
### 5.4 初始化配置 (JSON)
| Key | 路径 | 描述 |
| ------------------- | ---------------------------------- | -------- |
| `{prefix}:InitInfo` | `$.waitQueues` | 等待队列名称数组 |
|
| `$.processPolling` | 轮询队列名称 |
|
| `$.processCallback` | 回调队列名称 |
|
| `$.resultName` | 结果队列名称 |
|
| `$.PQtasksALL` | 总处理任务数 |
|
| `$.RQtasksALL` | 总结果任务数 |
|
| `$.CQtasksALL` | 总回调任务数 |
|
| `$.EQtaskALL` | 总错误任务数 |
|
| `$.platforms.{key}.WQtasks` | 平台等待任务数 |
|
| `$.platforms.{key}.PQtasks` | 平台处理任务数 |
|
| `$.platforms.{key}.MAX_CONCURRENT` | 平台最大并发数 |
|
| `$.platforms.{key}.waitQueue` | 平台等待队列名 |
***
## 六、任务唯一标识与类型
### 6.1 任务唯一标识
| 标识 | 生成位置 | 用途 |
| -------------- | ------ | -------- |
| `taskId` | 前端/客户端 | 内部任务唯一标识 |
| `remoteTaskId` | 外部平台 | 外部平台任务标识 |
| `backendId` | 客户端连接时 | 客户端连接标识 |
### 6.2 任务类型
| 类型 | 说明 | 处理方式 |
| -------------- | ------- | ---------- |
| 回调型 (callback) | comfyui | 等待外部回调通知 |
| 轮询型 (polling) | coze | 主动轮询外部平台状态 |
***
## 七、关键配置文件
### 7.1 model.json
```json
{
"digitalHuman-test": {
"comfyui": {
"apikey": "...",
"concurrency": 13
},
"coze": {
"apikey": "...",
"concurrency": 20
}
}
}
```
### 7.2 Platform.json
```json
{
"callback": ["comfyui"],
"polling": ["coze"]
}
```
### 7.3 .env 环境变量
```env
PROJECT_PREFIX='digitalHuman-test'
TOKEN_SECRET='...'
WS_PORT=8087
CALLBACK_PORT=8089
RunningHub_URL='...'
CALLBACK_URL='...'
REDIS_URL='...'
MESSAGE_DISPATCHER_URL='...'
```
***
## 八、错误码与消息
### 8.1 code.json
```json
{
"ERROR": {
"JSONError": "消息格式错误,请联系服务商。",
"OpcodeError": "错误提交,请稍后再试。",
"BalanceError": "余额不足,请充值后继续使用。",
"AssessmentError": "任务提交失败,请稍后再试。"
},
"SUCCESS": {
"AssessmentSuccess": "任务提交成功,正在排队中..."
}
}
```
***
## 九、消息持久化机制
当客户端断开连接时,待发送消息会被保存到 Redis,待客户端重连后重试发送:
1. **保存待发送消息**: `messagePersistence.savePendingMessage()`
2. **客户端重连时获取**: `messagePersistence.getPendingMessages()`
3. **发送成功后删除**: `messagePersistence.removePendingMessage()`
4. **定期清理过期消息**: 超过 2 天的消息自动清理
***
## 十、流程图
### 10.1 完整数据流时序图
```mermaid
sequenceDiagram
participant Client as 客户端
participant WS as WebSocket Server
participant A as Assessment Worker
participant Redis as Redis
participant W as Wait Worker
participant G as Generate Worker
participant Ext as 外部平台
participant P as Polling Worker
participant R as Result Worker
Client->>WS: WebSocket 连接 (token, id)
WS-->>Client: 连接成功
Client->>WS: {type: "generate", taskId, platform, payload}
WS->>A: 转发任务
A->>A: 参数校验
alt 校验失败
A-->>WS: 返回错误
WS-->>Client: 错误消息
else 校验成功
A->>Redis: HSET {prefix}:task:{taskId}
A->>Redis: RPUSH {AIGC}:{platform}:wait
A->>Redis: INCR WQtasks
A-->>WS: 成功
WS-->>Client: "任务提交成功,正在排队中..."
end
loop 每15秒检查
W->>Redis: GET InitInfo.platforms
W->>Redis: LLEN wait queue
alt 有可处理任务
W->>Redis: LRANGE wait queue (批量获取)
W->>Redis: HGETALL task data
W->>Redis: LTRIM wait queue
W->>Redis: DECR WQtasks, INCR PQtasks
W->>G: 发送任务数据
G->>Ext: POST /task/create
Ext-->>G: {remoteTaskId}
alt 提交失败
G->>Redis: HSET resultData (error)
G->>Redis: LPUSH error:list
G->>Redis: INCR EQtaskALL
else 回调型平台
G->>Redis: SET callback:{remoteTaskId} = taskId
G->>Redis: HSET remoteTaskId
else 轮询型平台
G->>Redis: HSET processPolling:{remoteTaskId}
G->>Redis: HSET remoteTaskId
end
end
end
alt 回调型平台
Ext->>WS: POST /callback/all
WS->>Redis: GET callback:{remoteTaskId}
WS->>Redis: HSET resultData
WS->>Redis: LPUSH callback
WS->>Redis: INCR CQtasksALL
else 轮询型平台
loop 每5-30秒轮询
P->>Redis: HGETALL processPolling
P->>Ext: GET /task/status
alt 任务未完成
P->>P: 继续轮询
else 任务成功
P->>Redis: HSET resultData
P->>Redis: LPUSH result:list
P->>Redis: HDEL processPolling
P->>Redis: DECR PQtasks, INCR RQtasksALL
else 任务失败
P->>Redis: HSET resultData (error)
P->>Redis: LPUSH error:list
P->>Redis: HDEL processPolling
P->>Redis: DECR PQtasks, INCR EQtaskALL
end
end
end
loop 每15秒检查
R->>Redis: LLEN result:list / error:list
alt 有结果
R->>Redis: LRANGE result:list
R->>Redis: HGETALL task data
R->>WS: 发送结果
WS->>Client: {taskId, result}
R->>Redis: LREM result:list
R->>Redis: DECR RQtasksALL
else 有错误
R->>Redis: LRANGE error:list
R->>Redis: HGETALL task data
R->>WS: 发送错误
WS->>Client: {taskId, error}
R->>Redis: LREM error:list
R->>Redis: DEL task:{taskId}
R->>Redis: DECR EQtaskALL, DECR WQtasks
end
end
```
***
## 十一、总结
该任务队列后端系统是一个设计完善的分布式任务处理系统,具有以下特点:
1. **多线程架构**: 使用 Worker Threads 实现各阶段解耦,提高并发处理能力
2. **Redis 作为核心**: 利用 Redis 的 List、Hash、JSON 等数据结构实现任务队列和状态管理
3. **支持多平台**: 可灵活接入不同类型的外部平台(回调型、轮询型)
4. **任务状态追踪**: 完整的任务生命周期管理,从接收、处理到完成/失败
5. **消息持久化**: 支持客户端断线重连后的消息补发
6. **并发控制**: 各平台独立的并发数配置,防止过载
7. **优雅降级**: comfyui 平台支持内部算力和外部平台的自动切换
系统的核心流程清晰,各组件职责明确,是一个生产级别的任务队列管理系统。