640 lines
24 KiB
Markdown
640 lines
24 KiB
Markdown
# 任务队列后端 - 业务流程分析报告
|
||
|
||
## 一、项目概述
|
||
|
||
**项目名称**: ComfyUI 任务队列后端
|
||
|
||
**核心功能**: 一个分布式任务处理系统,用于管理和分发 AI 图像生成任务到多个外部平台(如 comfyui、coze 等),支持任务队列、并发控制、状态跟踪和结果返回。
|
||
|
||
**技术栈**:
|
||
|
||
- Node.js + Express
|
||
- Redis(任务队列和数据存储)
|
||
- WebSocket(实时通信)
|
||
- Worker Threads(多线程处理)
|
||
|
||
***
|
||
|
||
## 二、系统架构
|
||
|
||
### 2.1 核心组件
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 前端 / 后端客户端 │
|
||
└────────────────────────────┬────────────────────────────────────┘
|
||
│ WebSocket (8087)
|
||
│ HTTP (8089 - 回调)
|
||
▼
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ index.js (主入口) │
|
||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||
│ │ WebSocket │ │ HTTP Server │ │ Worker │ │
|
||
│ │ Server │ │ │ │ Manager │ │
|
||
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
|
||
└─────────┼───────────────────┼───────────────────┼─────────────────┘
|
||
│ │ │
|
||
└───────────────────┴───────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ Redis │
|
||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||
│ │ 等待队列 │ │ 处理队列 │ │ 结果队列 │ │
|
||
│ │ (Wait) │ │ (Process) │ │ (Result) │ │
|
||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||
│ │ 错误队列 │ │ 任务数据 │ │ 初始化配置 │ │
|
||
│ │ (Error) │ │ (Hash) │ │ (InitInfo) │ │
|
||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 2.2 Worker Threads 架构
|
||
|
||
系统使用 6 个独立的 Worker Threads 处理不同阶段的任务:
|
||
|
||
| Worker 名称 | 功能描述 | 文件路径 |
|
||
| ---------------- | ------------- | ------------------------------------------ |
|
||
| assessment | 任务预处理、参数校验、入队 | `worker_threads/assessment/assessment.js` |
|
||
| wait | 等待队列监控、任务分发 | `worker_threads/wait/waiting.js` |
|
||
| polling | 轮询外部平台获取任务结果 | `worker_threads/process/process.js` |
|
||
| result | 结果队列处理、返回给客户端 | `worker_threads/result/result.js` |
|
||
| callback\_result | 回调结果处理 | `worker_threads/callback_result/result.js` |
|
||
| error | 错误队列处理 | `worker_threads/error/error.js` |
|
||
|
||
***
|
||
|
||
## 三、完整业务流程
|
||
|
||
### 3.1 流程总览图
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
Start([系统启动]) --> Init[初始化 Redis 队列]
|
||
Init --> WS[WebSocket 服务启动]
|
||
Init --> HTTP[HTTP 服务启动]
|
||
WS --> ReceiveTask[接收任务: type=generate]
|
||
|
||
ReceiveTask --> Assessment[Assessment Worker 处理]
|
||
Assessment --> Validate{参数校验}
|
||
Validate -->|失败| ErrorReturn[返回错误给客户端]
|
||
Validate -->|成功| StoreTask[存储任务到 Redis Hash]
|
||
StoreTask --> PushWait[推入等待队列]
|
||
PushWait --> NotifySuccess[通知客户端: 任务提交成功]
|
||
|
||
PushWait --> WaitWorker[Wait Worker 监控]
|
||
WaitWorker --> CheckConcurrency{检查并发数}
|
||
CheckConcurrency -->|并发未满| GetTasks[批量获取任务]
|
||
GetTasks --> GenerateWorker[Generate Worker 分发]
|
||
GenerateWorker --> ExternalPost[提交任务到外部平台]
|
||
|
||
ExternalPost --> PostResult{提交结果}
|
||
PostResult -->|失败| PushError[推入错误队列]
|
||
PostResult -->|成功| CheckPlatform{平台类型}
|
||
|
||
CheckPlatform -->|回调型| StoreCallback[存储 remoteTaskId 映射]
|
||
CheckPlatform -->|轮询型| PushPolling[推入轮询队列]
|
||
|
||
StoreCallback --> WaitCallback[等待外部回调]
|
||
PushPolling --> PollingWorker[Polling Worker 轮询]
|
||
|
||
WaitCallback --> CallbackReceived[收到回调]
|
||
PollingWorker --> PollingResult{轮询结果}
|
||
|
||
CallbackReceived --> ProcessCallback[处理回调数据]
|
||
PollingResult -->|完成| StoreResult[存储结果]
|
||
PollingResult -->|未完成| ContinuePolling[继续轮询]
|
||
|
||
ProcessCallback --> StoreResult
|
||
StoreResult --> PushResultQueue[推入结果队列]
|
||
|
||
PushResultQueue --> ResultWorker[Result Worker 处理]
|
||
ResultWorker --> SendResult[发送结果给客户端]
|
||
|
||
PushError --> ErrorWorker[Error Worker 处理]
|
||
ErrorWorker --> SendError[发送错误给客户端]
|
||
|
||
ContinuePolling --> PollingWorker
|
||
```
|
||
|
||
### 3.2 详细流程步骤
|
||
|
||
#### 阶段 1: 系统启动与初始化
|
||
|
||
**步骤 1.1: 启动主程序**
|
||
|
||
- 入口文件: [index.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\index.js)
|
||
- 加载环境变量 (.env)
|
||
- 创建 6 个 Worker Threads
|
||
|
||
**步骤 1.2: Redis 初始化**
|
||
|
||
- 连接 Redis
|
||
- 初始化队列配置 ([initQueue.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\redis\initQueue.js))
|
||
- 创建 `InitInfo` 配置对象,包含:
|
||
- 等待队列列表
|
||
- 各平台并发配置
|
||
- 任务计数器
|
||
|
||
**步骤 1.3: 启动服务**
|
||
|
||
- HTTP 服务器 (端口: 8087)
|
||
- WebSocket 服务器 (同端口)
|
||
- 回调服务器 (端口: 8089, [app.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\app.js))
|
||
|
||
***
|
||
|
||
#### 阶段 2: 任务接收与预处理
|
||
|
||
**步骤 2.1: 接收 WebSocket 任务**
|
||
|
||
- 客户端通过 WebSocket 连接,携带 token 和 id
|
||
- 验证 token (TOKEN\_SECRET)
|
||
- 接收 `type: "generate"` 消息
|
||
|
||
**步骤 2.2: Assessment Worker 处理**
|
||
|
||
- 文件: [assessment.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\assessment.js)
|
||
- 内部线程池: 3 个 [PreproTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\assessment\PreproTask.js)
|
||
|
||
**步骤 2.3: 参数校验**
|
||
|
||
- 校验必填字段: `taskId`, `platform`, `payload`
|
||
- 失败返回: `JSONError`, `OpcodeError`
|
||
|
||
**步骤 2.4: 存储任务**
|
||
|
||
- 创建任务 Hash: `{prefix}:task:{taskId}`
|
||
- 字段: `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `workflowId`
|
||
- 设置过期时间: 2 小时 (7200秒)
|
||
|
||
**步骤 2.5: 推入等待队列**
|
||
|
||
- 队列命名: `{AIGC}:{platform}:wait`
|
||
- 例如: `digitalHuman-test:comfyui:wait`
|
||
- 更新 `InitInfo.platforms.{key}.WQtasks` +1
|
||
|
||
**步骤 2.6: 通知客户端**
|
||
|
||
- 返回成功消息: `"任务提交成功,正在排队中..."`
|
||
|
||
***
|
||
|
||
#### 阶段 3: 等待队列监控与任务分发
|
||
|
||
**步骤 3.1: Wait Worker 监控**
|
||
|
||
- 文件: [waiting.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\waiting.js)
|
||
- 循环检查各平台等待队列 (间隔: 15秒)
|
||
|
||
**步骤 3.2: 判断并发容量**
|
||
|
||
- 获取 `InitInfo.platforms.{key}.PQtasks` (正在处理数)
|
||
- 获取 `InitInfo.platforms.{key}.MAX_CONCURRENT` (最大并发数)
|
||
- 对于 comfyui 平台,还会检查内部算力 (messageDispatcher)
|
||
|
||
**步骤 3.3: 批量获取任务**
|
||
|
||
- 从等待队列获取任务 ID (不超过剩余并发数)
|
||
- 批量获取任务数据 (Hash)
|
||
|
||
**步骤 3.4: 更新计数器**
|
||
|
||
- `WQtasks` -N (减少等待数)
|
||
- `PQtasks` +N (增加处理数)
|
||
- `PQtasksALL` +N (总处理数)
|
||
|
||
**步骤 3.5: 分发到 Generate Worker**
|
||
|
||
- 文件: [GenerateWorkerManager.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateWorkerManager.js)
|
||
- 内部线程池: [GenerateThreadPool.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\GenerateThreadPool.js)
|
||
|
||
***
|
||
|
||
#### 阶段 4: 提交任务到外部平台
|
||
|
||
**步骤 4.1: Generate Worker 处理**
|
||
|
||
- 文件: [generatTask.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\wait\generatTask.js)
|
||
- 批量调用 `externalPostRequest()`
|
||
|
||
**步骤 4.2: 外部平台接口调用**
|
||
|
||
- 文件: [generat.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\generat.js)
|
||
- 平台适配器: [outside.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\outside.js)
|
||
|
||
**支持的平台**:
|
||
|
||
| 平台 | 类型 | 适配器文件 |
|
||
| ------- | --- | -------------------------------------------------------------------------------------------------------------- |
|
||
| comfyui | 回调型 | [comfyui.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\comfyui.js) |
|
||
| coze | 轮询型 | [coze/coze.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\coze\coze.js) |
|
||
| jimuai | - | [JimuAI.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\outPlatforms\JimuAI.js) |
|
||
|
||
**comfyui 分发策略**:
|
||
|
||
- 优先使用 `messageDispatcher` (内部算力)
|
||
- 失败降级到 `runninghub` (外部平台)
|
||
|
||
**步骤 4.3: 处理提交结果**
|
||
|
||
**情况 A: 提交失败**
|
||
|
||
- 存储错误信息到任务 Hash
|
||
- 推入错误队列: `{prefix}:error:list`
|
||
- `EQtaskALL` +1
|
||
|
||
**情况 B: 提交成功 (回调型平台)**
|
||
|
||
- 存储映射: `{prefix}:callback:{remoteTaskId}` → `taskId`
|
||
- 更新任务 Hash: `remoteTaskId`
|
||
|
||
**情况 C: 提交成功 (轮询型平台)**
|
||
|
||
- 推入轮询队列: `{prefix}:processPolling:{AIGC}:{platform}`
|
||
- Hash 结构: `{remoteTaskId}` → `{taskId, platform, AIGC, workflowId}`
|
||
- 更新任务 Hash: `remoteTaskId`
|
||
|
||
***
|
||
|
||
#### 阶段 5: 任务结果获取
|
||
|
||
**分支 A: 回调型平台 (comfyui)**
|
||
|
||
**步骤 5A.1: 接收回调**
|
||
|
||
- 回调接口: `POST /callback/all` ([callback.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\callback.js))
|
||
- 立即返回 200 响应
|
||
|
||
**步骤 5A.2: 处理回调数据**
|
||
|
||
- 通过 `remoteTaskId` 查询 `taskId`
|
||
- 存储 `eventData` 到任务 Hash 的 `resultData`
|
||
|
||
* 推入回调队列: `{prefix}:callback`
|
||
* `CQtasksALL` +1
|
||
|
||
**步骤 5A.3: Callback Result Worker 处理**
|
||
|
||
- 文件: [callback\_result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\callback_result\result.js)
|
||
- 发送结果给客户端
|
||
|
||
***
|
||
|
||
**分支 B: 轮询型平台 (coze)**
|
||
|
||
**步骤 5B.1: Polling Worker 监控**
|
||
|
||
- 文件: [process.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\process\process.js)
|
||
- 为每个平台启动独立轮询循环
|
||
- 动态轮询间隔: 5-30秒 (根据任务数调整)
|
||
|
||
**步骤 5B.2: 查询外部平台**
|
||
|
||
- 文件: [polling.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\outside\polling.js)
|
||
- 调用 `externalGetRequest()`
|
||
- 批量查询 (每批最多 100 个任务)
|
||
|
||
**步骤 5B.3: 判断任务状态**
|
||
|
||
**情况 1: 任务未完成**
|
||
|
||
- 继续轮询 (不做任何操作)
|
||
|
||
**情况 2: 任务成功**
|
||
|
||
- 存储结果到任务 Hash 的 `resultData`
|
||
- 更新状态: `status = "success"`
|
||
- 推入结果队列: `{prefix}:result:list`
|
||
- 从轮询队列删除
|
||
- `PQtasks` -1, `RQtasksALL` +1
|
||
|
||
**情况 3: 任务失败**
|
||
|
||
- 存储错误信息到 `resultData`
|
||
- 更新状态: `status = "failed"`
|
||
- 推入错误队列: `{prefix}:error:list`
|
||
- 从轮询队列删除
|
||
- `PQtasks` -1, `EQtaskALL` +1
|
||
|
||
***
|
||
|
||
#### 阶段 6: 结果返回给客户端
|
||
|
||
**分支 A: 结果队列处理**
|
||
|
||
- 文件: [result/result.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\result\result.js)
|
||
- 从结果队列获取任务
|
||
- 通过 WebSocket 发送给对应 `backendId` 的客户端
|
||
- 从结果队列删除
|
||
- `RQtasksALL` -1
|
||
|
||
**分支 B: 错误队列处理**
|
||
|
||
- 文件: [error/error.js](file:///d:\WebUI\Kexue\comfyui\comfyui-cluster-bridge\任务队列后端\worker_threads\error\error.js)
|
||
- 从错误队列获取任务
|
||
- 通过 WebSocket 发送错误给客户端
|
||
- 从错误队列和任务 Hash 删除
|
||
- `EQtaskALL` -1, `WQtasks` -1
|
||
|
||
***
|
||
|
||
## 四、任务状态转换图
|
||
|
||
```mermaid
|
||
stateDiagram-v2
|
||
[*] --> pending: 任务创建
|
||
pending --> processing: 从等待队列取出
|
||
processing --> success: 任务成功完成
|
||
processing --> failed: 任务失败
|
||
success --> [*]: 结果已发送
|
||
failed --> [*]: 错误已发送
|
||
|
||
note right of pending
|
||
存储在等待队列
|
||
WQtasks +1
|
||
end note
|
||
|
||
note right of processing
|
||
存储在处理队列/轮询队列
|
||
PQtasks +1
|
||
end note
|
||
|
||
note right of success
|
||
存储在结果队列
|
||
RQtasksALL +1
|
||
end note
|
||
|
||
note right of failed
|
||
存储在错误队列
|
||
EQtaskALL +1
|
||
end note
|
||
```
|
||
|
||
***
|
||
|
||
## 五、Redis 数据结构
|
||
|
||
### 5.1 队列列表
|
||
|
||
| Key 模式 | 类型 | 描述 |
|
||
| ------------------------------------------- | ---- | ------------------------ |
|
||
| `{prefix}:{AIGC}:{platform}:wait` | List | 等待队列,存储 taskId |
|
||
| `{prefix}:process:Polling` | - | (已弃用,见下方) |
|
||
| `{prefix}:processPolling:{AIGC}:{platform}` | Hash | 轮询队列,remoteTaskId → 任务信息 |
|
||
| `{prefix}:result:queue` | - | (已弃用) |
|
||
| `{prefix}:result:list` | List | 结果队列,存储 taskId |
|
||
| `{prefix}:callback` | List | 回调队列,存储 taskId |
|
||
| `{prefix}:error:queue` | - | (已弃用) |
|
||
| `{prefix}:error:list` | List | 错误队列,存储 taskId |
|
||
|
||
### 5.2 任务数据 (Hash)
|
||
|
||
| Key | 类型 | 字段 |
|
||
| ------------------------ | ------ | ---------------------------------------------------------------------------------------------------------- |
|
||
| `{prefix}:task:{taskId}` | Hash | `taskId`, `payload`, `backendId`, `AIGC`, `platform`, `status`, `resultData`, `remoteTaskId`, `workflowId` |
|
||
| <br /> | <br /> | **过期时间**: 7200秒 (2小时) |
|
||
|
||
### 5.3 映射关系
|
||
|
||
| Key | 类型 | 描述 |
|
||
| --------------------------------------------------- | ------ | --------------------- |
|
||
| `{prefix}:callback:{remoteTaskId}` | String | remoteTaskId → taskId |
|
||
| `{prefix}:pending:messages` | List | 待发送消息键列表 |
|
||
| `{prefix}:pending:messages:{backendId}:{timestamp}` | Hash | 待发送消息数据 |
|
||
|
||
### 5.4 初始化配置 (JSON)
|
||
|
||
| Key | 路径 | 描述 |
|
||
| ------------------- | ---------------------------------- | -------- |
|
||
| `{prefix}:InitInfo` | `$.waitQueues` | 等待队列名称数组 |
|
||
| <br /> | `$.processPolling` | 轮询队列名称 |
|
||
| <br /> | `$.processCallback` | 回调队列名称 |
|
||
| <br /> | `$.resultName` | 结果队列名称 |
|
||
| <br /> | `$.PQtasksALL` | 总处理任务数 |
|
||
| <br /> | `$.RQtasksALL` | 总结果任务数 |
|
||
| <br /> | `$.CQtasksALL` | 总回调任务数 |
|
||
| <br /> | `$.EQtaskALL` | 总错误任务数 |
|
||
| <br /> | `$.platforms.{key}.WQtasks` | 平台等待任务数 |
|
||
| <br /> | `$.platforms.{key}.PQtasks` | 平台处理任务数 |
|
||
| <br /> | `$.platforms.{key}.MAX_CONCURRENT` | 平台最大并发数 |
|
||
| <br /> | `$.platforms.{key}.waitQueue` | 平台等待队列名 |
|
||
|
||
***
|
||
|
||
## 六、任务唯一标识与类型
|
||
|
||
### 6.1 任务唯一标识
|
||
|
||
| 标识 | 生成位置 | 用途 |
|
||
| -------------- | ------ | -------- |
|
||
| `taskId` | 前端/客户端 | 内部任务唯一标识 |
|
||
| `remoteTaskId` | 外部平台 | 外部平台任务标识 |
|
||
| `backendId` | 客户端连接时 | 客户端连接标识 |
|
||
|
||
### 6.2 任务类型
|
||
|
||
| 类型 | 说明 | 处理方式 |
|
||
| -------------- | ------- | ---------- |
|
||
| 回调型 (callback) | comfyui | 等待外部回调通知 |
|
||
| 轮询型 (polling) | coze | 主动轮询外部平台状态 |
|
||
|
||
***
|
||
|
||
## 七、关键配置文件
|
||
|
||
### 7.1 model.json
|
||
|
||
```json
|
||
{
|
||
"digitalHuman-test": {
|
||
"comfyui": {
|
||
"apikey": "...",
|
||
"concurrency": 13
|
||
},
|
||
"coze": {
|
||
"apikey": "...",
|
||
"concurrency": 20
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 7.2 Platform.json
|
||
|
||
```json
|
||
{
|
||
"callback": ["comfyui"],
|
||
"polling": ["coze"]
|
||
}
|
||
```
|
||
|
||
### 7.3 .env 环境变量
|
||
|
||
```env
|
||
PROJECT_PREFIX='digitalHuman-test'
|
||
TOKEN_SECRET='...'
|
||
WS_PORT=8087
|
||
CALLBACK_PORT=8089
|
||
RunningHub_URL='...'
|
||
CALLBACK_URL='...'
|
||
REDIS_URL='...'
|
||
MESSAGE_DISPATCHER_URL='...'
|
||
```
|
||
|
||
***
|
||
|
||
## 八、错误码与消息
|
||
|
||
### 8.1 code.json
|
||
|
||
```json
|
||
{
|
||
"ERROR": {
|
||
"JSONError": "消息格式错误,请联系服务商。",
|
||
"OpcodeError": "错误提交,请稍后再试。",
|
||
"BalanceError": "余额不足,请充值后继续使用。",
|
||
"AssessmentError": "任务提交失败,请稍后再试。"
|
||
},
|
||
"SUCCESS": {
|
||
"AssessmentSuccess": "任务提交成功,正在排队中..."
|
||
}
|
||
}
|
||
```
|
||
|
||
***
|
||
|
||
## 九、消息持久化机制
|
||
|
||
当客户端断开连接时,待发送消息会被保存到 Redis,待客户端重连后重试发送:
|
||
|
||
1. **保存待发送消息**: `messagePersistence.savePendingMessage()`
|
||
2. **客户端重连时获取**: `messagePersistence.getPendingMessages()`
|
||
3. **发送成功后删除**: `messagePersistence.removePendingMessage()`
|
||
4. **定期清理过期消息**: 超过 2 天的消息自动清理
|
||
|
||
***
|
||
|
||
## 十、流程图
|
||
|
||
### 10.1 完整数据流时序图
|
||
|
||
```mermaid
|
||
sequenceDiagram
|
||
participant Client as 客户端
|
||
participant WS as WebSocket Server
|
||
participant A as Assessment Worker
|
||
participant Redis as Redis
|
||
participant W as Wait Worker
|
||
participant G as Generate Worker
|
||
participant Ext as 外部平台
|
||
participant P as Polling Worker
|
||
participant R as Result Worker
|
||
|
||
Client->>WS: WebSocket 连接 (token, id)
|
||
WS-->>Client: 连接成功
|
||
|
||
Client->>WS: {type: "generate", taskId, platform, payload}
|
||
WS->>A: 转发任务
|
||
A->>A: 参数校验
|
||
alt 校验失败
|
||
A-->>WS: 返回错误
|
||
WS-->>Client: 错误消息
|
||
else 校验成功
|
||
A->>Redis: HSET {prefix}:task:{taskId}
|
||
A->>Redis: RPUSH {AIGC}:{platform}:wait
|
||
A->>Redis: INCR WQtasks
|
||
A-->>WS: 成功
|
||
WS-->>Client: "任务提交成功,正在排队中..."
|
||
end
|
||
|
||
loop 每15秒检查
|
||
W->>Redis: GET InitInfo.platforms
|
||
W->>Redis: LLEN wait queue
|
||
alt 有可处理任务
|
||
W->>Redis: LRANGE wait queue (批量获取)
|
||
W->>Redis: HGETALL task data
|
||
W->>Redis: LTRIM wait queue
|
||
W->>Redis: DECR WQtasks, INCR PQtasks
|
||
W->>G: 发送任务数据
|
||
G->>Ext: POST /task/create
|
||
Ext-->>G: {remoteTaskId}
|
||
|
||
alt 提交失败
|
||
G->>Redis: HSET resultData (error)
|
||
G->>Redis: LPUSH error:list
|
||
G->>Redis: INCR EQtaskALL
|
||
else 回调型平台
|
||
G->>Redis: SET callback:{remoteTaskId} = taskId
|
||
G->>Redis: HSET remoteTaskId
|
||
else 轮询型平台
|
||
G->>Redis: HSET processPolling:{remoteTaskId}
|
||
G->>Redis: HSET remoteTaskId
|
||
end
|
||
end
|
||
end
|
||
|
||
alt 回调型平台
|
||
Ext->>WS: POST /callback/all
|
||
WS->>Redis: GET callback:{remoteTaskId}
|
||
WS->>Redis: HSET resultData
|
||
WS->>Redis: LPUSH callback
|
||
WS->>Redis: INCR CQtasksALL
|
||
else 轮询型平台
|
||
loop 每5-30秒轮询
|
||
P->>Redis: HGETALL processPolling
|
||
P->>Ext: GET /task/status
|
||
alt 任务未完成
|
||
P->>P: 继续轮询
|
||
else 任务成功
|
||
P->>Redis: HSET resultData
|
||
P->>Redis: LPUSH result:list
|
||
P->>Redis: HDEL processPolling
|
||
P->>Redis: DECR PQtasks, INCR RQtasksALL
|
||
else 任务失败
|
||
P->>Redis: HSET resultData (error)
|
||
P->>Redis: LPUSH error:list
|
||
P->>Redis: HDEL processPolling
|
||
P->>Redis: DECR PQtasks, INCR EQtaskALL
|
||
end
|
||
end
|
||
end
|
||
|
||
loop 每15秒检查
|
||
R->>Redis: LLEN result:list / error:list
|
||
alt 有结果
|
||
R->>Redis: LRANGE result:list
|
||
R->>Redis: HGETALL task data
|
||
R->>WS: 发送结果
|
||
WS->>Client: {taskId, result}
|
||
R->>Redis: LREM result:list
|
||
R->>Redis: DECR RQtasksALL
|
||
else 有错误
|
||
R->>Redis: LRANGE error:list
|
||
R->>Redis: HGETALL task data
|
||
R->>WS: 发送错误
|
||
WS->>Client: {taskId, error}
|
||
R->>Redis: LREM error:list
|
||
R->>Redis: DEL task:{taskId}
|
||
R->>Redis: DECR EQtaskALL, DECR WQtasks
|
||
end
|
||
end
|
||
```
|
||
|
||
***
|
||
|
||
## 十一、总结
|
||
|
||
该任务队列后端系统是一个设计完善的分布式任务处理系统,具有以下特点:
|
||
|
||
1. **多线程架构**: 使用 Worker Threads 实现各阶段解耦,提高并发处理能力
|
||
2. **Redis 作为核心**: 利用 Redis 的 List、Hash、JSON 等数据结构实现任务队列和状态管理
|
||
3. **支持多平台**: 可灵活接入不同类型的外部平台(回调型、轮询型)
|
||
4. **任务状态追踪**: 完整的任务生命周期管理,从接收、处理到完成/失败
|
||
5. **消息持久化**: 支持客户端断线重连后的消息补发
|
||
6. **并发控制**: 各平台独立的并发数配置,防止过载
|
||
7. **优雅降级**: comfyui 平台支持内部算力和外部平台的自动切换
|
||
|
||
系统的核心流程清晰,各组件职责明确,是一个生产级别的任务队列管理系统。
|