shuzhiren-comfyui/任务队列后端/业务流程分析报告.md

24 KiB
Raw Permalink Blame History

任务队列后端 - 业务流程分析报告

一、项目概述

项目名称: ComfyUI 任务队列后端

核心功能: 一个分布式任务处理系统,用于管理和分发 AI 图像生成任务到多个外部平台(如 comfyui、coze 等),支持任务队列、并发控制、状态跟踪和结果返回。

技术栈:

  • Node.js + Express
  • Redis任务队列和数据存储
  • WebSocket实时通信
  • Worker Threads多线程处理

二、系统架构

2.1 核心组件

┌─────────────────────────────────────────────────────────────────┐
│                        前端 / 后端客户端                          │
└────────────────────────────┬────────────────────────────────────┘
                             │ WebSocket (8087)
                             │ HTTP (8089 - 回调)
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                     index.js (主入口)                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐        │
│  │ WebSocket    │  │  HTTP Server │  │  Worker      │        │
│  │  Server      │  │              │  │  Manager     │        │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘        │
└─────────┼───────────────────┼───────────────────┼─────────────────┘
          │                   │                   │
          └───────────────────┴───────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                           Redis                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐        │
│  │  等待队列    │  │  处理队列    │  │  结果队列    │        │
│  │  (Wait)      │  │  (Process)   │  │  (Result)    │        │
│  └──────────────┘  └──────────────┘  └──────────────┘        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐        │
│  │  错误队列    │  │  任务数据    │  │  初始化配置  │        │
│  │  (Error)     │  │  (Hash)      │  │  (InitInfo)  │        │
│  └──────────────┘  └──────────────┘  └──────────────┘        │
└─────────────────────────────────────────────────────────────────┘

2.2 Worker Threads 架构

系统使用 6 个独立的 Worker Threads 处理不同阶段的任务:

Worker 名称 功能描述 文件路径
assessment 任务预处理、参数校验、入队 worker_threads/assessment/assessment.js
wait 等待队列监控、任务分发 worker_threads/wait/waiting.js
polling 轮询外部平台获取任务结果 worker_threads/process/process.js
result 结果队列处理、返回给客户端 worker_threads/result/result.js
callback_result 回调结果处理 worker_threads/callback_result/result.js
error 错误队列处理 worker_threads/error/error.js

三、完整业务流程

3.1 流程总览图

flowchart TD
    Start([系统启动]) --> Init[初始化 Redis 队列]
    Init --> WS[WebSocket 服务启动]
    Init --> HTTP[HTTP 服务启动]
    WS --> ReceiveTask[接收任务: type=generate]
    
    ReceiveTask --> Assessment[Assessment Worker 处理]
    Assessment --> Validate{参数校验}
    Validate -->|失败| ErrorReturn[返回错误给客户端]
    Validate -->|成功| StoreTask[存储任务到 Redis Hash]
    StoreTask --> PushWait[推入等待队列]
    PushWait --> NotifySuccess[通知客户端: 任务提交成功]
    
    PushWait --> WaitWorker[Wait Worker 监控]
    WaitWorker --> CheckConcurrency{检查并发数}
    CheckConcurrency -->|并发未满| GetTasks[批量获取任务]
    GetTasks --> GenerateWorker[Generate Worker 分发]
    GenerateWorker --> ExternalPost[提交任务到外部平台]
    
    ExternalPost --> PostResult{提交结果}
    PostResult -->|失败| PushError[推入错误队列]
    PostResult -->|成功| CheckPlatform{平台类型}
    
    CheckPlatform -->|回调型| StoreCallback[存储 remoteTaskId 映射]
    CheckPlatform -->|轮询型| PushPolling[推入轮询队列]
    
    StoreCallback --> WaitCallback[等待外部回调]
    PushPolling --> PollingWorker[Polling Worker 轮询]
    
    WaitCallback --> CallbackReceived[收到回调]
    PollingWorker --> PollingResult{轮询结果}
    
    CallbackReceived --> ProcessCallback[处理回调数据]
    PollingResult -->|完成| StoreResult[存储结果]
    PollingResult -->|未完成| ContinuePolling[继续轮询]
    
    ProcessCallback --> StoreResult
    StoreResult --> PushResultQueue[推入结果队列]
    
    PushResultQueue --> ResultWorker[Result Worker 处理]
    ResultWorker --> SendResult[发送结果给客户端]
    
    PushError --> ErrorWorker[Error Worker 处理]
    ErrorWorker --> SendError[发送错误给客户端]
    
    ContinuePolling --> PollingWorker

3.2 详细流程步骤

阶段 1: 系统启动与初始化

步骤 1.1: 启动主程序

  • 入口文件: index.js
  • 加载环境变量 (.env)
  • 创建 6 个 Worker Threads

步骤 1.2: Redis 初始化

  • 连接 Redis
  • 初始化队列配置 (initQueue.js)
  • 创建 InitInfo 配置对象,包含:
    • 等待队列列表
    • 各平台并发配置
    • 任务计数器

步骤 1.3: 启动服务

  • HTTP 服务器 (端口: 8087)
  • WebSocket 服务器 (同端口)
  • 回调服务器 (端口: 8089, app.js)

阶段 2: 任务接收与预处理

步骤 2.1: 接收 WebSocket 任务

  • 客户端通过 WebSocket 连接,携带 token 和 id
  • 验证 token (TOKEN_SECRET)
  • 接收 type: "generate" 消息

步骤 2.2: Assessment Worker 处理

步骤 2.3: 参数校验

  • 校验必填字段: taskId, platform, payload
  • 失败返回: JSONError, OpcodeError

步骤 2.4: 存储任务

  • 创建任务 Hash: {prefix}:task:{taskId}
  • 字段: taskId, payload, backendId, AIGC, platform, status, workflowId
  • 设置过期时间: 2 小时 (7200秒)

步骤 2.5: 推入等待队列

  • 队列命名: {AIGC}:{platform}:wait
  • 例如: digitalHuman-test:comfyui:wait
  • 更新 InitInfo.platforms.{key}.WQtasks +1

步骤 2.6: 通知客户端

  • 返回成功消息: "任务提交成功,正在排队中..."

阶段 3: 等待队列监控与任务分发

步骤 3.1: Wait Worker 监控

  • 文件: waiting.js
  • 循环检查各平台等待队列 (间隔: 15秒)

步骤 3.2: 判断并发容量

  • 获取 InitInfo.platforms.{key}.PQtasks (正在处理数)
  • 获取 InitInfo.platforms.{key}.MAX_CONCURRENT (最大并发数)
  • 对于 comfyui 平台,还会检查内部算力 (messageDispatcher)

步骤 3.3: 批量获取任务

  • 从等待队列获取任务 ID (不超过剩余并发数)
  • 批量获取任务数据 (Hash)

步骤 3.4: 更新计数器

  • WQtasks -N (减少等待数)
  • PQtasks +N (增加处理数)
  • PQtasksALL +N (总处理数)

步骤 3.5: 分发到 Generate Worker


阶段 4: 提交任务到外部平台

步骤 4.1: Generate Worker 处理

步骤 4.2: 外部平台接口调用

支持的平台:

平台 类型 适配器文件
comfyui 回调型 comfyui.js
coze 轮询型 coze/coze.js
jimuai - JimuAI.js

comfyui 分发策略:

  • 优先使用 messageDispatcher (内部算力)
  • 失败降级到 runninghub (外部平台)

步骤 4.3: 处理提交结果

情况 A: 提交失败

  • 存储错误信息到任务 Hash
  • 推入错误队列: {prefix}:error:list
  • EQtaskALL +1

情况 B: 提交成功 (回调型平台)

  • 存储映射: {prefix}:callback:{remoteTaskId}taskId
  • 更新任务 Hash: remoteTaskId

情况 C: 提交成功 (轮询型平台)

  • 推入轮询队列: {prefix}:processPolling:{AIGC}:{platform}
  • Hash 结构: {remoteTaskId}{taskId, platform, AIGC, workflowId}
  • 更新任务 Hash: remoteTaskId

阶段 5: 任务结果获取

分支 A: 回调型平台 (comfyui)

步骤 5A.1: 接收回调

  • 回调接口: POST /callback/all (callback.js)
  • 立即返回 200 响应

步骤 5A.2: 处理回调数据

  • 通过 remoteTaskId 查询 taskId
  • 存储 eventData 到任务 Hash 的 resultData
  • 推入回调队列: {prefix}:callback
  • CQtasksALL +1

步骤 5A.3: Callback Result Worker 处理


分支 B: 轮询型平台 (coze)

步骤 5B.1: Polling Worker 监控

  • 文件: process.js
  • 为每个平台启动独立轮询循环
  • 动态轮询间隔: 5-30秒 (根据任务数调整)

步骤 5B.2: 查询外部平台

  • 文件: polling.js
  • 调用 externalGetRequest()
  • 批量查询 (每批最多 100 个任务)

步骤 5B.3: 判断任务状态

情况 1: 任务未完成

  • 继续轮询 (不做任何操作)

情况 2: 任务成功

  • 存储结果到任务 Hash 的 resultData
  • 更新状态: status = "success"
  • 推入结果队列: {prefix}:result:list
  • 从轮询队列删除
  • PQtasks -1, RQtasksALL +1

情况 3: 任务失败

  • 存储错误信息到 resultData
  • 更新状态: status = "failed"
  • 推入错误队列: {prefix}:error:list
  • 从轮询队列删除
  • PQtasks -1, EQtaskALL +1

阶段 6: 结果返回给客户端

分支 A: 结果队列处理

  • 文件: result/result.js
  • 从结果队列获取任务
  • 通过 WebSocket 发送给对应 backendId 的客户端
  • 从结果队列删除
  • RQtasksALL -1

分支 B: 错误队列处理

  • 文件: error/error.js
  • 从错误队列获取任务
  • 通过 WebSocket 发送错误给客户端
  • 从错误队列和任务 Hash 删除
  • EQtaskALL -1, WQtasks -1

四、任务状态转换图

stateDiagram-v2
    [*] --> pending: 任务创建
    pending --> processing: 从等待队列取出
    processing --> success: 任务成功完成
    processing --> failed: 任务失败
    success --> [*]: 结果已发送
    failed --> [*]: 错误已发送
    
    note right of pending
        存储在等待队列
        WQtasks +1
    end note
    
    note right of processing
        存储在处理队列/轮询队列
        PQtasks +1
    end note
    
    note right of success
        存储在结果队列
        RQtasksALL +1
    end note
    
    note right of failed
        存储在错误队列
        EQtaskALL +1
    end note

五、Redis 数据结构

5.1 队列列表

Key 模式 类型 描述
{prefix}:{AIGC}:{platform}:wait List 等待队列,存储 taskId
{prefix}:process:Polling - (已弃用,见下方)
{prefix}:processPolling:{AIGC}:{platform} Hash 轮询队列remoteTaskId → 任务信息
{prefix}:result:queue - (已弃用)
{prefix}:result:list List 结果队列,存储 taskId
{prefix}:callback List 回调队列,存储 taskId
{prefix}:error:queue - (已弃用)
{prefix}:error:list List 错误队列,存储 taskId

5.2 任务数据 (Hash)

Key 类型 字段
{prefix}:task:{taskId} Hash taskId, payload, backendId, AIGC, platform, status, resultData, remoteTaskId, workflowId


过期时间: 7200秒 (2小时)

5.3 映射关系

Key 类型 描述
{prefix}:callback:{remoteTaskId} String remoteTaskId → taskId
{prefix}:pending:messages List 待发送消息键列表
{prefix}:pending:messages:{backendId}:{timestamp} Hash 待发送消息数据

5.4 初始化配置 (JSON)

Key 路径 描述
{prefix}:InitInfo $.waitQueues 等待队列名称数组

$.processPolling 轮询队列名称

$.processCallback 回调队列名称

$.resultName 结果队列名称

$.PQtasksALL 总处理任务数

$.RQtasksALL 总结果任务数

$.CQtasksALL 总回调任务数

$.EQtaskALL 总错误任务数

$.platforms.{key}.WQtasks 平台等待任务数

$.platforms.{key}.PQtasks 平台处理任务数

$.platforms.{key}.MAX_CONCURRENT 平台最大并发数

$.platforms.{key}.waitQueue 平台等待队列名

六、任务唯一标识与类型

6.1 任务唯一标识

标识 生成位置 用途
taskId 前端/客户端 内部任务唯一标识
remoteTaskId 外部平台 外部平台任务标识
backendId 客户端连接时 客户端连接标识

6.2 任务类型

类型 说明 处理方式
回调型 (callback) comfyui 等待外部回调通知
轮询型 (polling) coze 主动轮询外部平台状态

七、关键配置文件

7.1 model.json

{
  "digitalHuman-test": {
    "comfyui": {
      "apikey": "...",
      "concurrency": 13
    },
    "coze": {
      "apikey": "...",
      "concurrency": 20
    }
  }
}

7.2 Platform.json

{
  "callback": ["comfyui"],
  "polling": ["coze"]
}

7.3 .env 环境变量

PROJECT_PREFIX='digitalHuman-test'
TOKEN_SECRET='...'
WS_PORT=8087
CALLBACK_PORT=8089
RunningHub_URL='...'
CALLBACK_URL='...'
REDIS_URL='...'
MESSAGE_DISPATCHER_URL='...'

八、错误码与消息

8.1 code.json

{
  "ERROR": {
    "JSONError": "消息格式错误,请联系服务商。",
    "OpcodeError": "错误提交,请稍后再试。",
    "BalanceError": "余额不足,请充值后继续使用。",
    "AssessmentError": "任务提交失败,请稍后再试。"
  },
  "SUCCESS": {
    "AssessmentSuccess": "任务提交成功,正在排队中..."
  }
}

九、消息持久化机制

当客户端断开连接时,待发送消息会被保存到 Redis待客户端重连后重试发送

  1. 保存待发送消息: messagePersistence.savePendingMessage()
  2. 客户端重连时获取: messagePersistence.getPendingMessages()
  3. 发送成功后删除: messagePersistence.removePendingMessage()
  4. 定期清理过期消息: 超过 2 天的消息自动清理

十、流程图

10.1 完整数据流时序图

sequenceDiagram
    participant Client as 客户端
    participant WS as WebSocket Server
    participant A as Assessment Worker
    participant Redis as Redis
    participant W as Wait Worker
    participant G as Generate Worker
    participant Ext as 外部平台
    participant P as Polling Worker
    participant R as Result Worker
    
    Client->>WS: WebSocket 连接 (token, id)
    WS-->>Client: 连接成功
    
    Client->>WS: {type: "generate", taskId, platform, payload}
    WS->>A: 转发任务
    A->>A: 参数校验
    alt 校验失败
        A-->>WS: 返回错误
        WS-->>Client: 错误消息
    else 校验成功
        A->>Redis: HSET {prefix}:task:{taskId}
        A->>Redis: RPUSH {AIGC}:{platform}:wait
        A->>Redis: INCR WQtasks
        A-->>WS: 成功
        WS-->>Client: "任务提交成功,正在排队中..."
    end
    
    loop 每15秒检查
        W->>Redis: GET InitInfo.platforms
        W->>Redis: LLEN wait queue
        alt 有可处理任务
            W->>Redis: LRANGE wait queue (批量获取)
            W->>Redis: HGETALL task data
            W->>Redis: LTRIM wait queue
            W->>Redis: DECR WQtasks, INCR PQtasks
            W->>G: 发送任务数据
            G->>Ext: POST /task/create
            Ext-->>G: {remoteTaskId}
            
            alt 提交失败
                G->>Redis: HSET resultData (error)
                G->>Redis: LPUSH error:list
                G->>Redis: INCR EQtaskALL
            else 回调型平台
                G->>Redis: SET callback:{remoteTaskId} = taskId
                G->>Redis: HSET remoteTaskId
            else 轮询型平台
                G->>Redis: HSET processPolling:{remoteTaskId}
                G->>Redis: HSET remoteTaskId
            end
        end
    end
    
    alt 回调型平台
        Ext->>WS: POST /callback/all
        WS->>Redis: GET callback:{remoteTaskId}
        WS->>Redis: HSET resultData
        WS->>Redis: LPUSH callback
        WS->>Redis: INCR CQtasksALL
    else 轮询型平台
        loop 每5-30秒轮询
            P->>Redis: HGETALL processPolling
            P->>Ext: GET /task/status
            alt 任务未完成
                P->>P: 继续轮询
            else 任务成功
                P->>Redis: HSET resultData
                P->>Redis: LPUSH result:list
                P->>Redis: HDEL processPolling
                P->>Redis: DECR PQtasks, INCR RQtasksALL
            else 任务失败
                P->>Redis: HSET resultData (error)
                P->>Redis: LPUSH error:list
                P->>Redis: HDEL processPolling
                P->>Redis: DECR PQtasks, INCR EQtaskALL
            end
        end
    end
    
    loop 每15秒检查
        R->>Redis: LLEN result:list / error:list
        alt 有结果
            R->>Redis: LRANGE result:list
            R->>Redis: HGETALL task data
            R->>WS: 发送结果
            WS->>Client: {taskId, result}
            R->>Redis: LREM result:list
            R->>Redis: DECR RQtasksALL
        else 有错误
            R->>Redis: LRANGE error:list
            R->>Redis: HGETALL task data
            R->>WS: 发送错误
            WS->>Client: {taskId, error}
            R->>Redis: LREM error:list
            R->>Redis: DEL task:{taskId}
            R->>Redis: DECR EQtaskALL, DECR WQtasks
        end
    end

十一、总结

该任务队列后端系统是一个设计完善的分布式任务处理系统,具有以下特点:

  1. 多线程架构: 使用 Worker Threads 实现各阶段解耦,提高并发处理能力
  2. Redis 作为核心: 利用 Redis 的 List、Hash、JSON 等数据结构实现任务队列和状态管理
  3. 支持多平台: 可灵活接入不同类型的外部平台(回调型、轮询型)
  4. 任务状态追踪: 完整的任务生命周期管理,从接收、处理到完成/失败
  5. 消息持久化: 支持客户端断线重连后的消息补发
  6. 并发控制: 各平台独立的并发数配置,防止过载
  7. 优雅降级: comfyui 平台支持内部算力和外部平台的自动切换

系统的核心流程清晰,各组件职责明确,是一个生产级别的任务队列管理系统。