shuzhiren-comfyui/任务队列后端/业务流程分析报告.md

640 lines
24 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 任务队列后端 - 业务流程分析报告
## 一、项目概述
**项目名称**: ComfyUI 任务队列后端
**核心功能**: 一个分布式任务处理系统,用于管理和分发 AI 图像生成任务到多个外部平台(如 comfyui、coze 等),支持任务队列、并发控制、状态跟踪和结果返回。
**技术栈**:
- Node.js + Express
- Redis任务队列和数据存储
- WebSocket实时通信
- Worker Threads多线程处理
***
## 二、系统架构
### 2.1 核心组件
```
┌─────────────────────────────────────────────────────────────────┐
│ 前端 / 后端客户端 │
└────────────────────────────┬────────────────────────────────────┘
│ WebSocket (8087)
│ HTTP (8089 - 回调)
┌─────────────────────────────────────────────────────────────────┐
│ index.js (主入口) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WebSocket │ │ HTTP Server │ │ Worker │ │
│ │ Server │ │ │ │ Manager │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼───────────────────┼───────────────────┼─────────────────┘
│ │ │
└───────────────────┴───────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Redis │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 等待队列 │ │ 处理队列 │ │ 结果队列 │ │
│ │ (Wait) │ │ (Process) │ │ (Result) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 错误队列 │ │ 任务数据 │ │ 初始化配置 │ │
│ │ (Error) │ │ (Hash) │ │ (InitInfo) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 2.2 Worker Threads 架构
系统使用 6 个独立的 Worker Threads 处理不同阶段的任务:
| Worker 名称 | 功能描述 | 文件路径 |
| ---------------- | ------------- | ------------------------------------------ |
| assessment | 任务预处理、参数校验、入队 | `worker_threads/assessment/assessment.js` |
| wait | 等待队列监控、任务分发 | `worker_threads/wait/waiting.js` |
| polling | 轮询外部平台获取任务结果 | `worker_threads/process/process.js` |
| result | 结果队列处理、返回给客户端 | `worker_threads/result/result.js` |
| callback\_result | 回调结果处理 | `worker_threads/callback_result/result.js` |
| error | 错误队列处理 | `worker_threads/error/error.js` |
***
## 三、完整业务流程
### 3.1 流程总览图
```mermaid
flowchart TD
Start([系统启动]) --> Init[初始化 Redis 队列]
Init --> WS[WebSocket 服务启动]
Init --> HTTP[HTTP 服务启动]
WS --> ReceiveTask[接收任务: type=generate]
ReceiveTask --> Assessment[Assessment Worker 处理]
Assessment --> Validate{参数校验}
Validate -->|失败| ErrorReturn[返回错误给客户端]
Validate -->|成功| StoreTask[存储任务到 Redis Hash]
StoreTask --> PushWait[推入等待队列]
PushWait --> NotifySuccess[通知客户端: 任务提交成功]
PushWait --> WaitWorker[Wait Worker 监控]
WaitWorker --> CheckConcurrency{检查并发数}
CheckConcurrency -->|并发未满| GetTasks[批量获取任务]
GetTasks --> GenerateWorker[Generate Worker 分发]
GenerateWorker --> ExternalPost[提交任务到外部平台]
ExternalPost --> PostResult{提交结果}
PostResult -->|失败| PushError[推入错误队列]
PostResult -->|成功| CheckPlatform{平台类型}
CheckPlatform -->|回调型| StoreCallback[存储 remoteTaskId 映射]
CheckPlatform -->|轮询型| PushPolling[推入轮询队列]
StoreCallback --> WaitCallback[等待外部回调]
PushPolling --> PollingWorker[Polling Worker 轮询]
WaitCallback --> CallbackReceived[收到回调]
PollingWorker --> PollingResult{轮询结果}
CallbackReceived --> ProcessCallback[处理回调数据]
PollingResult -->|完成| StoreResult[存储结果]
PollingResult -->|未完成| ContinuePolling[继续轮询]
ProcessCallback --> StoreResult
StoreResult --> PushResultQueue[推入结果队列]
PushResultQueue --> ResultWorker[Result Worker 处理]
ResultWorker --> SendResult[发送结果给客户端]
PushError --> ErrorWorker[Error Worker 处理]
ErrorWorker --> SendError[发送错误给客户端]
ContinuePolling --> PollingWorker
```
### 3.2 详细流程步骤
#### 阶段 1: 系统启动与初始化
**步骤 1.1: 启动主程序**
- 入口文件: [index.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\index.js)
- 加载环境变量 (.env)
- 创建 6 个 Worker Threads
**步骤 1.2: Redis 初始化**
- 连接 Redis
- 初始化队列配置 ([initQueue.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\redis\initQueue.js))
- 创建 `InitInfo` 配置对象,包含:
- 等待队列列表
- 各平台并发配置
- 任务计数器
**步骤 1.3: 启动服务**
- HTTP 服务器 (端口: 8087)
- WebSocket 服务器 (同端口)
- 回调服务器 (端口: 8089, [app.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\app.js))
***
#### 阶段 2: 任务接收与预处理
**步骤 2.1: 接收 WebSocket 任务**
- 客户端通过 WebSocket 连接,携带 token 和 id
- 验证 token (TOKEN\_SECRET)
- 接收 `type: "generate"` 消息
**步骤 2.2: Assessment Worker 处理**
- 文件: [assessment.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\assessment.js)
- 内部线程池: 3 个 [PreproTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\PreproTask.js)
**步骤 2.3: 参数校验**
- 校验必填字段: `taskId`, `platform`, `payload`
- 失败返回: `JSONError`, `OpcodeError`
**步骤 2.4: 存储任务**
- 创建任务 Hash: `{prefix}:task:{taskId}`
- 字段: `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `workflowId`
- 设置过期时间: 2 小时 (7200秒)
**步骤 2.5: 推入等待队列**
- 队列命名: `{AIGC}:{platform}:wait`
- 例如: `digitalHuman-test:comfyui:wait`
- 更新 `InitInfo.platforms.{key}.WQtasks` +1
**步骤 2.6: 通知客户端**
- 返回成功消息: `"任务提交成功,正在排队中..."`
***
#### 阶段 3: 等待队列监控与任务分发
**步骤 3.1: Wait Worker 监控**
- 文件: [waiting.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\waiting.js)
- 循环检查各平台等待队列 (间隔: 15秒)
**步骤 3.2: 判断并发容量**
- 获取 `InitInfo.platforms.{key}.PQtasks` (正在处理数)
- 获取 `InitInfo.platforms.{key}.MAX_CONCURRENT` (最大并发数)
- 对于 comfyui 平台,还会检查内部算力 (messageDispatcher)
**步骤 3.3: 批量获取任务**
- 从等待队列获取任务 ID (不超过剩余并发数)
- 批量获取任务数据 (Hash)
**步骤 3.4: 更新计数器**
- `WQtasks` -N (减少等待数)
- `PQtasks` +N (增加处理数)
- `PQtasksALL` +N (总处理数)
**步骤 3.5: 分发到 Generate Worker**
- 文件: [GenerateWorkerManager.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateWorkerManager.js)
- 内部线程池: [GenerateThreadPool.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateThreadPool.js)
***
#### 阶段 4: 提交任务到外部平台
**步骤 4.1: Generate Worker 处理**
- 文件: [generatTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\generatTask.js)
- 批量调用 `externalPostRequest()`
**步骤 4.2: 外部平台接口调用**
- 文件: [generat.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\generat.js)
- 平台适配器: [outside.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\outside.js)
**支持的平台**:
| 平台 | 类型 | 适配器文件 |
| ------- | --- | -------------------------------------------------------------------------------------------------------------- |
| comfyui | 回调型 | [comfyui.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\comfyui.js) |
| coze | 轮询型 | [coze/coze.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\coze\coze.js) |
| jimuai | - | [JimuAI.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\JimuAI.js) |
**comfyui 分发策略**:
- 优先使用 `messageDispatcher` (内部算力)
- 失败降级到 `runninghub` (外部平台)
**步骤 4.3: 处理提交结果**
**情况 A: 提交失败**
- 存储错误信息到任务 Hash
- 推入错误队列: `{prefix}:error:list`
- `EQtaskALL` +1
**情况 B: 提交成功 (回调型平台)**
- 存储映射: `{prefix}:callback:{remoteTaskId}``taskId`
- 更新任务 Hash: `remoteTaskId`
**情况 C: 提交成功 (轮询型平台)**
- 推入轮询队列: `{prefix}:processPolling:{AIGC}:{platform}`
- Hash 结构: `{remoteTaskId}``{taskId, platform, AIGC, workflowId}`
- 更新任务 Hash: `remoteTaskId`
***
#### 阶段 5: 任务结果获取
**分支 A: 回调型平台 (comfyui)**
**步骤 5A.1: 接收回调**
- 回调接口: `POST /callback/all` ([callback.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\callback.js))
- 立即返回 200 响应
**步骤 5A.2: 处理回调数据**
- 通过 `remoteTaskId` 查询 `taskId`
- 存储 `eventData` 到任务 Hash 的 `resultData`
* 推入回调队列: `{prefix}:callback`
* `CQtasksALL` +1
**步骤 5A.3: Callback Result Worker 处理**
- 文件: [callback\_result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\callback_result\result.js)
- 发送结果给客户端
***
**分支 B: 轮询型平台 (coze)**
**步骤 5B.1: Polling Worker 监控**
- 文件: [process.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\process\process.js)
- 为每个平台启动独立轮询循环
- 动态轮询间隔: 5-30秒 (根据任务数调整)
**步骤 5B.2: 查询外部平台**
- 文件: [polling.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\polling.js)
- 调用 `externalGetRequest()`
- 批量查询 (每批最多 100 个任务)
**步骤 5B.3: 判断任务状态**
**情况 1: 任务未完成**
- 继续轮询 (不做任何操作)
**情况 2: 任务成功**
- 存储结果到任务 Hash 的 `resultData`
- 更新状态: `status = "success"`
- 推入结果队列: `{prefix}:result:list`
- 从轮询队列删除
- `PQtasks` -1, `RQtasksALL` +1
**情况 3: 任务失败**
- 存储错误信息到 `resultData`
- 更新状态: `status = "failed"`
- 推入错误队列: `{prefix}:error:list`
- 从轮询队列删除
- `PQtasks` -1, `EQtaskALL` +1
***
#### 阶段 6: 结果返回给客户端
**分支 A: 结果队列处理**
- 文件: [result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\result\result.js)
- 从结果队列获取任务
- 通过 WebSocket 发送给对应 `backendId` 的客户端
- 从结果队列删除
- `RQtasksALL` -1
**分支 B: 错误队列处理**
- 文件: [error/error.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\error\error.js)
- 从错误队列获取任务
- 通过 WebSocket 发送错误给客户端
- 从错误队列和任务 Hash 删除
- `EQtaskALL` -1, `WQtasks` -1
***
## 四、任务状态转换图
```mermaid
stateDiagram-v2
[*] --> pending: 任务创建
pending --> processing: 从等待队列取出
processing --> success: 任务成功完成
processing --> failed: 任务失败
success --> [*]: 结果已发送
failed --> [*]: 错误已发送
note right of pending
存储在等待队列
WQtasks +1
end note
note right of processing
存储在处理队列/轮询队列
PQtasks +1
end note
note right of success
存储在结果队列
RQtasksALL +1
end note
note right of failed
存储在错误队列
EQtaskALL +1
end note
```
***
## 五、Redis 数据结构
### 5.1 队列列表
| Key 模式 | 类型 | 描述 |
| ------------------------------------------- | ---- | ------------------------ |
| `{prefix}:{AIGC}:{platform}:wait` | List | 等待队列,存储 taskId |
| `{prefix}:process:Polling` | - | (已弃用,见下方) |
| `{prefix}:processPolling:{AIGC}:{platform}` | Hash | 轮询队列remoteTaskId → 任务信息 |
| `{prefix}:result:queue` | - | (已弃用) |
| `{prefix}:result:list` | List | 结果队列,存储 taskId |
| `{prefix}:callback` | List | 回调队列,存储 taskId |
| `{prefix}:error:queue` | - | (已弃用) |
| `{prefix}:error:list` | List | 错误队列,存储 taskId |
### 5.2 任务数据 (Hash)
| Key | 类型 | 字段 |
| ------------------------ | ------ | ---------------------------------------------------------------------------------------------------------- |
| `{prefix}:task:{taskId}` | Hash | `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `resultData`, `remoteTaskId`, `workflowId` |
| <br /> | <br /> | **过期时间**: 7200秒 (2小时) |
### 5.3 映射关系
| Key | 类型 | 描述 |
| --------------------------------------------------- | ------ | --------------------- |
| `{prefix}:callback:{remoteTaskId}` | String | remoteTaskId → taskId |
| `{prefix}:pending:messages` | List | 待发送消息键列表 |
| `{prefix}:pending:messages:{backendId}:{timestamp}` | Hash | 待发送消息数据 |
### 5.4 初始化配置 (JSON)
| Key | 路径 | 描述 |
| ------------------- | ---------------------------------- | -------- |
| `{prefix}:InitInfo` | `$.waitQueues` | 等待队列名称数组 |
| <br /> | `$.processPolling` | 轮询队列名称 |
| <br /> | `$.processCallback` | 回调队列名称 |
| <br /> | `$.resultName` | 结果队列名称 |
| <br /> | `$.PQtasksALL` | 总处理任务数 |
| <br /> | `$.RQtasksALL` | 总结果任务数 |
| <br /> | `$.CQtasksALL` | 总回调任务数 |
| <br /> | `$.EQtaskALL` | 总错误任务数 |
| <br /> | `$.platforms.{key}.WQtasks` | 平台等待任务数 |
| <br /> | `$.platforms.{key}.PQtasks` | 平台处理任务数 |
| <br /> | `$.platforms.{key}.MAX_CONCURRENT` | 平台最大并发数 |
| <br /> | `$.platforms.{key}.waitQueue` | 平台等待队列名 |
***
## 六、任务唯一标识与类型
### 6.1 任务唯一标识
| 标识 | 生成位置 | 用途 |
| -------------- | ------ | -------- |
| `taskId` | 前端/客户端 | 内部任务唯一标识 |
| `remoteTaskId` | 外部平台 | 外部平台任务标识 |
| `backendId` | 客户端连接时 | 客户端连接标识 |
### 6.2 任务类型
| 类型 | 说明 | 处理方式 |
| -------------- | ------- | ---------- |
| 回调型 (callback) | comfyui | 等待外部回调通知 |
| 轮询型 (polling) | coze | 主动轮询外部平台状态 |
***
## 七、关键配置文件
### 7.1 model.json
```json
{
"digitalHuman-test": {
"comfyui": {
"apikey": "...",
"concurrency": 13
},
"coze": {
"apikey": "...",
"concurrency": 20
}
}
}
```
### 7.2 Platform.json
```json
{
"callback": ["comfyui"],
"polling": ["coze"]
}
```
### 7.3 .env 环境变量
```env
PROJECT_PREFIX='digitalHuman-test'
TOKEN_SECRET='...'
WS_PORT=8087
CALLBACK_PORT=8089
RunningHub_URL='...'
CALLBACK_URL='...'
REDIS_URL='...'
MESSAGE_DISPATCHER_URL='...'
```
***
## 八、错误码与消息
### 8.1 code.json
```json
{
"ERROR": {
"JSONError": "消息格式错误,请联系服务商。",
"OpcodeError": "错误提交,请稍后再试。",
"BalanceError": "余额不足,请充值后继续使用。",
"AssessmentError": "任务提交失败,请稍后再试。"
},
"SUCCESS": {
"AssessmentSuccess": "任务提交成功,正在排队中..."
}
}
```
***
## 九、消息持久化机制
当客户端断开连接时,待发送消息会被保存到 Redis待客户端重连后重试发送
1. **保存待发送消息**: `messagePersistence.savePendingMessage()`
2. **客户端重连时获取**: `messagePersistence.getPendingMessages()`
3. **发送成功后删除**: `messagePersistence.removePendingMessage()`
4. **定期清理过期消息**: 超过 2 天的消息自动清理
***
## 十、流程图
### 10.1 完整数据流时序图
```mermaid
sequenceDiagram
participant Client as 客户端
participant WS as WebSocket Server
participant A as Assessment Worker
participant Redis as Redis
participant W as Wait Worker
participant G as Generate Worker
participant Ext as 外部平台
participant P as Polling Worker
participant R as Result Worker
Client->>WS: WebSocket 连接 (token, id)
WS-->>Client: 连接成功
Client->>WS: {type: "generate", taskId, platform, payload}
WS->>A: 转发任务
A->>A: 参数校验
alt 校验失败
A-->>WS: 返回错误
WS-->>Client: 错误消息
else 校验成功
A->>Redis: HSET {prefix}:task:{taskId}
A->>Redis: RPUSH {AIGC}:{platform}:wait
A->>Redis: INCR WQtasks
A-->>WS: 成功
WS-->>Client: "任务提交成功,正在排队中..."
end
loop 每15秒检查
W->>Redis: GET InitInfo.platforms
W->>Redis: LLEN wait queue
alt 有可处理任务
W->>Redis: LRANGE wait queue (批量获取)
W->>Redis: HGETALL task data
W->>Redis: LTRIM wait queue
W->>Redis: DECR WQtasks, INCR PQtasks
W->>G: 发送任务数据
G->>Ext: POST /task/create
Ext-->>G: {remoteTaskId}
alt 提交失败
G->>Redis: HSET resultData (error)
G->>Redis: LPUSH error:list
G->>Redis: INCR EQtaskALL
else 回调型平台
G->>Redis: SET callback:{remoteTaskId} = taskId
G->>Redis: HSET remoteTaskId
else 轮询型平台
G->>Redis: HSET processPolling:{remoteTaskId}
G->>Redis: HSET remoteTaskId
end
end
end
alt 回调型平台
Ext->>WS: POST /callback/all
WS->>Redis: GET callback:{remoteTaskId}
WS->>Redis: HSET resultData
WS->>Redis: LPUSH callback
WS->>Redis: INCR CQtasksALL
else 轮询型平台
loop 每5-30秒轮询
P->>Redis: HGETALL processPolling
P->>Ext: GET /task/status
alt 任务未完成
P->>P: 继续轮询
else 任务成功
P->>Redis: HSET resultData
P->>Redis: LPUSH result:list
P->>Redis: HDEL processPolling
P->>Redis: DECR PQtasks, INCR RQtasksALL
else 任务失败
P->>Redis: HSET resultData (error)
P->>Redis: LPUSH error:list
P->>Redis: HDEL processPolling
P->>Redis: DECR PQtasks, INCR EQtaskALL
end
end
end
loop 每15秒检查
R->>Redis: LLEN result:list / error:list
alt 有结果
R->>Redis: LRANGE result:list
R->>Redis: HGETALL task data
R->>WS: 发送结果
WS->>Client: {taskId, result}
R->>Redis: LREM result:list
R->>Redis: DECR RQtasksALL
else 有错误
R->>Redis: LRANGE error:list
R->>Redis: HGETALL task data
R->>WS: 发送错误
WS->>Client: {taskId, error}
R->>Redis: LREM error:list
R->>Redis: DEL task:{taskId}
R->>Redis: DECR EQtaskALL, DECR WQtasks
end
end
```
***
## 十一、总结
该任务队列后端系统是一个设计完善的分布式任务处理系统,具有以下特点:
1. **多线程架构**: 使用 Worker Threads 实现各阶段解耦,提高并发处理能力
2. **Redis 作为核心**: 利用 Redis 的 List、Hash、JSON 等数据结构实现任务队列和状态管理
3. **支持多平台**: 可灵活接入不同类型的外部平台(回调型、轮询型)
4. **任务状态追踪**: 完整的任务生命周期管理,从接收、处理到完成/失败
5. **消息持久化**: 支持客户端断线重连后的消息补发
6. **并发控制**: 各平台独立的并发数配置,防止过载
7. **优雅降级**: comfyui 平台支持内部算力和外部平台的自动切换
系统的核心流程清晰,各组件职责明确,是一个生产级别的任务队列管理系统。