/** * TaskForwarder - 任务转发器 * * 设计说明: * - clientId 使用实例 ID(固定不变),实现 WebSocket 连接复用 * - prompt_id 使用 taskId,便于任务追踪和查询 * - 同一实例的所有任务共享同一个 WebSocket 连接 * - 通过 prompt_id 区分不同任务的消息 */ import { v4 as uuidv4 } from 'uuid'; import logger from '../logger/index.js'; import clusterManager from '../cluster-manager/index.js'; import webSocketClient from '../websocket-client/index.js'; import axios from 'axios'; import taskQueueClient from '../task-queue-client/index.js'; import fileUploader from '../file-uploader/index.js'; import config from '../config/index.js'; import fs from 'fs'; import path from 'path'; class TaskForwarder { constructor() { this.tasks = new Map(); this.setupEventListeners(); } setupEventListeners() { logger.info('[TaskForwarder] 设置事件监听器'); webSocketClient.on('execution_start', ({ instanceId, promptId }) => { logger.info(`[TaskForwarder] 收到 execution_start 事件: instanceId=${instanceId}, promptId=${promptId}`); this.handleExecutionStart(instanceId, promptId).catch(err => { logger.error('处理 execution_start 事件失败:', err); }); }); webSocketClient.on('progress', ({ instanceId, data }) => { this.handleProgress(instanceId, data).catch(err => { logger.error('处理 progress 事件失败:', err); }); }); webSocketClient.on('executed', ({ instanceId, data }) => { logger.info(`[TaskForwarder] 收到 executed 事件: instanceId=${instanceId}, promptId=${data?.prompt_id}`); this.handleExecuted(instanceId, data).catch(err => { logger.error('处理 executed 事件失败:', err); }); }); webSocketClient.on('execution_success', ({ instanceId, data }) => { logger.info(`[TaskForwarder] 收到 execution_success 事件: instanceId=${instanceId}, promptId=${data?.prompt_id}`); this.handleExecutionSuccess(instanceId, data).catch(err => { logger.error('处理 execution_success 事件失败:', err); }); }); webSocketClient.on('execution_error', ({ instanceId, data }) => { logger.error(`[TaskForwarder] 收到 execution_error 事件: instanceId=${instanceId}, promptId=${data?.prompt_id}`); this.handleExecutionError(instanceId, data).catch(err => { logger.error('处理 execution_error 事件失败:', err); }); }); } async submitTask(workflow, nodeInfoList = [], workflowId = null, instanceId = null, webhookUrl = null, queueTaskId = null) { const taskId = queueTaskId || uuidv4(); let instance; if (instanceId) { instance = clusterManager.getInstance(instanceId); if (!instance) { throw new Error(`实例 ${instanceId} 不存在`); } } else { instance = clusterManager.selectInstance(); if (!instance) { throw new Error('没有可用的实例'); } } const task = { id: taskId, promptId: taskId, workflow, nodeInfoList, workflowId, webhookUrl, queueTaskId, instanceId: instance.id, status: 'pending', progress: 0, createdAt: new Date().toISOString(), startedAt: null, completedAt: null, result: null, error: null }; this.tasks.set(taskId, task); logger.info(`任务已创建: ${taskId}, 分配到实例: ${instance.id}`); try { await this.sendTaskToInstance(task, instance); return taskId; } catch (error) { task.status = 'failed'; task.error = error.message; this.tasks.set(taskId, task); logger.error(`任务 ${taskId} 提交失败:`, error); if (webhookUrl) { await this.sendWebhookCallback(task, null, error.message); } throw error; } } async sendTaskToInstance(task, instance) { logger.info(`[TaskForwarder] 准备发送任务 ${task.id} 到实例 ${instance.id}`); logger.info(`[TaskForwarder] 实例信息: ${JSON.stringify({ id: instance.id, wsUrl: instance.wsUrl, apiUrl: instance.apiUrl })}`); const wsClientId = instance.id; const wsUrlWithClientId = `${instance.wsUrl}?clientId=${wsClientId}`; logger.info(`[TaskForwarder] WebSocket URL (clientId=实例ID): ${wsUrlWithClientId}`); await webSocketClient.connect(instance.id, wsUrlWithClientId); const promptPayload = { prompt: task.workflow, prompt_id: task.id, client_id: wsClientId }; logger.info(`[TaskForwarder] 发送的 prompt 消息结构: prompt_id=${task.id}, client_id=${wsClientId}, workflow节点数=${Object.keys(task.workflow || {}).length}`); logger.info(`[TaskForwarder] 通过 HTTP POST /prompt 提交任务到 ${instance.apiUrl}/prompt`); try { const response = await axios.post(`${instance.apiUrl}/prompt`, promptPayload, { headers: { 'Content-Type': 'application/json' }, timeout: 30000 }); logger.info(`[TaskForwarder] HTTP POST /prompt 响应: ${JSON.stringify(response.data)}`); if (response.data?.node_errors && Object.keys(response.data.node_errors).length > 0) { const nodeErrors = response.data.node_errors; const errorMessages = []; for (const [nodeId, errorInfo] of Object.entries(nodeErrors)) { if (errorInfo.errors && errorInfo.errors.length > 0) { for (const err of errorInfo.errors) { errorMessages.push(`节点 ${nodeId}: ${err.message}${err.details ? ` (${err.details})` : ''}`); } } } const fullErrorMessage = `工作流节点错误: ${errorMessages.join('; ')}`; logger.error(`[TaskForwarder] ${fullErrorMessage}`); throw new Error(fullErrorMessage); } const returnedPromptId = response.data?.prompt_id || response.data?.promptId; logger.info(`[TaskForwarder] 任务 ${task.id} 已提交,ComfyUI 返回 prompt_id: ${returnedPromptId}`); } catch (error) { let errorMessage = error.message; if (error.response && error.response.data) { const comfyError = error.response.data; logger.error(`[TaskForwarder] 错误响应: ${JSON.stringify(comfyError)}`); if (comfyError.error && comfyError.error.message) { errorMessage = comfyError.error.message; } else if (comfyError.message) { errorMessage = comfyError.message; } else if (typeof comfyError === 'string') { errorMessage = comfyError; } else { errorMessage = JSON.stringify(comfyError); } logger.error(`[TaskForwarder] 提取的错误信息: ${errorMessage}`); } throw new Error(errorMessage); } task.status = 'submitted'; logger.info(`任务 ${task.id} 已发送到实例 ${instance.id}`); } async handleExecutionStart(instanceId, promptId) { logger.info(`[TaskForwarder] handleExecutionStart: instanceId=${instanceId}, promptId=${promptId}`); const task = this.tasks.get(promptId); if (!task) { logger.warn(`[TaskForwarder] 未找到任务: promptId=${promptId}`); return; } if (task.instanceId !== instanceId) { logger.warn(`[TaskForwarder] 任务实例不匹配: taskId=${promptId}, task.instanceId=${task.instanceId}, event.instanceId=${instanceId}`); return; } if (task.status !== 'submitted') { logger.warn(`[TaskForwarder] 任务状态不正确: taskId=${promptId}, status=${task.status}`); return; } task.status = 'running'; task.startedAt = new Date().toISOString(); this.tasks.set(promptId, task); clusterManager.updateInstanceStatus(instanceId, 'busy'); logger.info(`任务 ${task.id} 开始执行`); } async handleProgress(instanceId, data) { if (!data?.prompt_id) { return; } const task = this.tasks.get(data.prompt_id); if (!task || task.instanceId !== instanceId) { return; } if (data.max && data.max > 0) { task.progress = Math.round((data.value / data.max) * 100); this.tasks.set(data.prompt_id, task); logger.info(`[TaskForwarder] 任务 ${data.prompt_id} 进度: ${task.progress}%`); } } /** * 处理节点执行完成事件 * * 说明: * - 每个节点执行完成后,ComfyUI会发送executed WebSocket消息 * - 如果节点有输出(如PreviewImage),会包含output字段 * - 这里仅收集部分结果,完整结果在handleExecutionSuccess中处理 * * @param {string} instanceId - ComfyUI实例ID * @param {Object} data - WebSocket消息数据 * @param {string} data.prompt_id - 任务ID * @param {string} data.node - 节点ID * @param {Object} data.output - 节点输出(可选) */ async handleExecuted(instanceId, data) { logger.info(`[TaskForwarder] handleExecuted: instanceId=${instanceId}, promptId=${data?.prompt_id}, node=${data?.node}`); // 检查是否有输出数据 if (!data?.output) { logger.info(`[TaskForwarder] executed 消息无输出,跳过处理`); return; } const promptId = data.prompt_id; const task = this.tasks.get(promptId); if (!task || task.instanceId !== instanceId) { return; } // 初始化部分结果数组 if (!task.partialResults) { task.partialResults = []; } // 收集当前节点的输出结果 task.partialResults.push(data); logger.info(`[TaskForwarder] 收集节点 ${data.node} 的输出结果`); } async handleExecutionError(instanceId, data) { logger.error(`[TaskForwarder] handleExecutionError: instanceId=${instanceId}, promptId=${data?.prompt_id}`); const promptId = data.prompt_id; const task = this.tasks.get(promptId); if (!task || task.instanceId !== instanceId) { return; } task.status = 'failed'; task.completedAt = new Date().toISOString(); task.error = data.exception_message || data.error || JSON.stringify(data); this.tasks.set(promptId, task); clusterManager.updateInstanceStatus(instanceId, 'online'); logger.error(`任务 ${task.id} 执行失败: ${task.error}`); if (task.webhookUrl) { await this.sendWebhookCallback(task, null, task.error); } if (task.queueTaskId) { taskQueueClient.notifyTaskComplete(task.queueTaskId, null, task.error); } } /** * 处理任务执行成功事件 * * 这是文件上传流程的入口点: * 1. 收到ComfyUI的execution_success WebSocket消息后触发 * 2. 调用ComfyUI的/history/{prompt_id} API获取任务输出信息 * 3. 从history返回的outputs中提取文件信息 * 4. 调用processHistoryOutputs处理并上传所有输出文件 * * @param {string} instanceId - ComfyUI实例ID * @param {Object} data - WebSocket消息数据,包含prompt_id */ async handleExecutionSuccess(instanceId, data) { logger.info(`[TaskForwarder] handleExecutionSuccess: instanceId=${instanceId}, promptId=${data?.prompt_id}`); const promptId = data.prompt_id; const task = this.tasks.get(promptId); if (!task) { logger.warn(`[TaskForwarder] 未找到任务: promptId=${promptId}`); return; } if (task.instanceId !== instanceId) { logger.warn(`[TaskForwarder] 任务实例不匹配: taskId=${promptId}`); return; } logger.info(`[TaskForwarder] 找到匹配的任务: ${promptId}, 准备获取结果`); const instance = clusterManager.getInstance(instanceId); if (!instance) { logger.error(`[TaskForwarder] 无法找到实例: ${instanceId}`); return; } try { // ========== 步骤1:调用ComfyUI history API获取任务输出信息 ========== // history API返回格式: { [prompt_id]: { outputs: { [nodeId]: { images: [...] } } } } const historyResponse = await axios.get(`${instance.apiUrl}/history/${promptId}`, { timeout: 10000 }); const historyData = historyResponse.data; logger.info(`[TaskForwarder] 获取到历史记录: ${JSON.stringify(historyData)}`); // ========== 步骤2:从history数据中提取outputs ========== // outputs包含所有输出节点的文件信息 let outputs = null; if (historyData && historyData[promptId] && historyData[promptId].outputs) { outputs = historyData[promptId].outputs; } else if (historyData && historyData.outputs) { outputs = historyData.outputs; } // ========== 步骤3:处理并上传输出文件 ========== if (outputs) { // 调用processHistoryOutputs处理所有输出文件 // 这是上传流程的核心方法 const resultData = await this.processHistoryOutputs(outputs, instanceId); // 更新任务状态为完成 task.status = 'completed'; task.completedAt = new Date().toISOString(); task.result = outputs; this.tasks.set(promptId, task); clusterManager.updateInstanceStatus(instanceId, 'online'); logger.info(`任务 ${task.id} 执行完成,结果数量: ${resultData.length}`); // 发送webhook回调通知调用方 if (task.webhookUrl) { await this.sendWebhookCallback(task, { output: outputs }, null); } // 通知任务队列客户端任务完成 if (task.queueTaskId) { taskQueueClient.notifyTaskComplete(task.queueTaskId, resultData); } } else { logger.warn(`[TaskForwarder] 历史记录中没有 outputs`); } } catch (error) { logger.error(`[TaskForwarder] 获取历史记录失败: ${error.message}`); task.status = 'failed'; task.error = `获取结果失败: ${error.message}`; this.tasks.set(promptId, task); if (task.webhookUrl) { await this.sendWebhookCallback(task, null, task.error); } if (task.queueTaskId) { taskQueueClient.notifyTaskComplete(task.queueTaskId, null, task.error); } } } /** * 处理历史记录中的输出结果 * * 流程说明: * 1. 从ComfyUI的history API获取的outputs中提取所有类型的媒体文件信息 * 2. outputs结构: { [nodeId]: { images: [...], gifs: [...], audio: [...], ... } } * 3. 自动检测输出中的所有数组类型字段(images/gifs/audio/text等) * 4. 每个媒体文件包含: filename, subfolder, type 等信息 * 5. 遍历所有节点的输出,逐个上传文件到外部服务器 * 6. 非文件类型输出(如text, value等)会被跳过,不会作为文件处理 * * @param {Object} outputs - ComfyUI history返回的输出对象 * @param {string} instanceId - ComfyUI实例ID * @returns {Array} 上传结果数组,包含fileUrl、fileType等信息 */ async processHistoryOutputs(outputs, instanceId) { const resultData = []; if (!outputs) { return resultData; } const instance = clusterManager.getInstance(instanceId); if (!instance) { logger.error(`[processHistoryOutputs] 无法找到实例: ${instanceId}`); return resultData; } // 定义有效的文件输出类型(这些类型包含filename字段) const FILE_OUTPUT_TYPES = ['images', 'gifs', 'video', 'audio', 'files']; // 定义非文件类型的输出(这些是元数据或简单值,不需要上传) const NON_FILE_OUTPUT_TYPES = ['text', 'value', 'string', 'number', 'boolean', 'object']; for (const [nodeId, output] of Object.entries(outputs)) { if (!output) continue; const mediaFiles = []; const typeCounts = {}; const processedTypes = new Set(); for (const [key, value] of Object.entries(output)) { // 跳过已处理的类型 if (processedTypes.has(key)) continue; processedTypes.add(key); // 检查是否为数组类型 if (!Array.isArray(value)) { // 如果是非文件类型的非数组值,记录但不处理 if (NON_FILE_OUTPUT_TYPES.includes(key)) { typeCounts[key] = 'skipped (non-file type)'; } continue; } // 检查数组中的元素是否为有效的文件对象(包含filename字段) const validFiles = value.filter(item => item && typeof item === 'object' && 'filename' in item); if (validFiles.length > 0) { // 有效的文件数组 mediaFiles.push(...validFiles); typeCounts[key] = validFiles.length; } else if (value.length > 0) { // 数组但没有有效的filename,可能是text/value等类型 typeCounts[key] = `skipped (${value.length} items, no filename)`; logger.debug(`[processHistoryOutputs] 节点 ${nodeId} 跳过非文件类型: ${key}, 数量: ${value.length}`); } } // 打印日志,显示各类型的数量 const typeCountStr = Object.entries(typeCounts) .map(([type, count]) => `${type}=${count}`) .join(', '); if (mediaFiles.length > 0) { logger.info(`[processHistoryOutputs] 节点 ${nodeId} 找到 ${mediaFiles.length} 个媒体文件 (${typeCountStr})`); } else { logger.debug(`[processHistoryOutputs] 节点 ${nodeId} 无有效媒体文件: ${typeCountStr}`); } for (const media of mediaFiles) { try { // 再次检查filename是否存在 if (!media.filename) { logger.warn(`[processHistoryOutputs] 跳过没有filename的媒体: nodeId=${nodeId}`); continue; } logger.info(`[processHistoryOutputs] 正在上传文件: ${media.filename}`); const fileUrl = await this.uploadImage(media, instance); logger.info(`[processHistoryOutputs] 文件上传成功: ${fileUrl}`); resultData.push({ fileUrl, fileType: media.type || 'png', taskCostTime: 0, nodeId }); } catch (error) { logger.error('[processHistoryOutputs] 上传文件失败:', error); } } } return resultData; } /** * 上传单个媒体文件到外部服务器 * * 支持的文件类型: * - 图片:images (png, jpg等) * - 动图:gifs * - 音频:audio (flac, mp3, wav等) * * 上传流程(两种方式): * * 方式一:本地文件直接上传(优先) * - 条件:环境变量 COMFYUI_OUTPUT_DIR 已配置,且文件存在于本地 * - 适用场景:bridge服务与ComfyUI在同一台服务器,可直接访问output目录 * - 流程:直接读取本地文件 → 上传到外部服务器 * * 方式二:通过ComfyUI HTTP API获取文件(回退方案) * - 条件:本地文件不存在或COMFYUI_OUTPUT_DIR未配置 * - 适用场景:bridge服务与ComfyUI不在同一台服务器 * - 流程:调用ComfyUI的/view API → 保存到本地临时目录 → 上传到外部服务器 → 删除临时文件 * * @param {Object} media - 媒体文件信息对象 * @param {string} media.filename - 文件名 * @param {string} media.subfolder - 子文件夹路径(可选) * @param {string} media.type - 文件类型(output/temp/input) * @param {Object} instance - ComfyUI实例对象 * @returns {string} 上传后的文件URL */ async uploadImage(media, instance) { const comfyuiOutputDir = process.env.COMFYUI_OUTPUT_DIR; // ========== 方式一:本地文件直接上传 ========== if (comfyuiOutputDir) { // 构建本地文件完整路径 let localFilePath = path.join(comfyuiOutputDir, media.filename); if (media.subfolder) { localFilePath = path.join(comfyuiOutputDir, media.subfolder, media.filename); } // 检查本地文件是否存在 if (fs.existsSync(localFilePath)) { logger.info(`从本地目录读取文件: ${localFilePath}`); // 直接上传本地文件 const uploadResult = await fileUploader.uploadToExternalServer(localFilePath, media.filename); return uploadResult.url || uploadResult.data?.url; } else { logger.warn(`本地文件不存在: ${localFilePath}, 回退到 HTTP API`); } } // ========== 方式二:通过HTTP API获取文件 ========== // 构建ComfyUI /view API的URL // API格式: http://host:port/view?filename=xxx&subfolder=xxx&type=output const mediaUrl = `${instance.apiUrl}/view?filename=${media.filename}&subfolder=${media.subfolder || ''}&type=${media.type}`; // 通过HTTP请求获取文件内容(二进制数据) const response = await axios.get(mediaUrl, { responseType: 'arraybuffer' }); // 创建本地临时目录用于存放下载的文件 const uploadsDir = path.resolve(process.cwd(), 'uploads'); if (!fs.existsSync(uploadsDir)) { fs.mkdirSync(uploadsDir, { recursive: true }); } // 从文件名中提取扩展名,或使用media.type let ext = media.type; if (media.filename) { const match = media.filename.match(/\.([a-zA-Z0-9]+)$/); if (match) { ext = match[1]; } } // 生成唯一的临时文件名,避免冲突 const tempPath = path.join(uploadsDir, `${uuidv4()}.${ext}`); // 将下载的文件内容写入临时文件 await fs.promises.writeFile(tempPath, response.data); // 上传临时文件到外部服务器 const uploadResult = await fileUploader.uploadToExternalServer(tempPath, media.filename); // 上传完成后删除临时文件,释放磁盘空间 await fs.promises.unlink(tempPath); return uploadResult.url || uploadResult.data?.url; } /** * 发送Webhook回调通知 * * 说明: * - 任务完成或失败时,通过webhook通知调用方 * - 成功时会重新处理一遍输出文件并上传 * - 失败时直接返回错误信息 * * 回调消息格式: * { * event: 'TASK_END', * taskId: 'xxx', * eventData: JSON.stringify({ * code: 0 | 1, // 0成功,1失败 * msg: 'success' | 'error message', * data: [...] // 成功时包含文件URL列表 * }) * } * * @param {Object} task - 任务对象 * @param {Object|null} resultData - 成功时的结果数据(包含output) * @param {string|null} error - 失败时的错误信息 */ async sendWebhookCallback(task, resultData, error = null) { if (!task.webhookUrl) { return; } let eventData; if (error) { // 失败情况:返回错误信息 eventData = JSON.stringify({ code: 1, msg: error, data: [] }); } else { // 成功情况:重新处理输出文件并上传 const processedData = await this.processHistoryOutputs(resultData?.output, task.instanceId); eventData = JSON.stringify({ code: 0, msg: 'success', data: processedData }); } // 构建回调消息 const callbackMessage = { event: 'TASK_END', taskId: task.queueTaskId || task.id, instanceId: task.instanceId, // 添加 instanceId 字段 eventData }; try { logger.info(`发送Webhook回调到: ${task.webhookUrl}`); await axios.post(task.webhookUrl, callbackMessage, { timeout: 10000 }); logger.info(`Webhook回调发送成功: ${task.id}`); } catch (error) { logger.error('发送Webhook回调失败:', error.message); } } async getTask(taskId) { return this.tasks.get(taskId) || null; } async getTasks(status = null) { let tasks = Array.from(this.tasks.values()); if (status) { tasks = tasks.filter(t => t.status === status); } return tasks; } async cancelTask(taskId) { const task = this.tasks.get(taskId); if (!task) { return false; } if (task.status === 'completed' || task.status === 'failed') { return false; } if (task.status === 'running' && task.promptId) { try { const instance = clusterManager.getInstance(task.instanceId); if (instance) { await axios.post(`${instance.apiUrl}/interrupt`); } } catch (error) { logger.error(`中断任务失败:`, error); } } task.status = 'cancelled'; task.completedAt = new Date().toISOString(); this.tasks.set(taskId, task); logger.info(`任务 ${taskId} 已取消`); return true; } async getTaskStatus(taskId) { const task = this.tasks.get(taskId); if (!task) { return null; } return { id: task.id, promptId: task.promptId, status: task.status, progress: task.progress, instanceId: task.instanceId, createdAt: task.createdAt, startedAt: task.startedAt, completedAt: task.completedAt, error: task.error }; } } export default new TaskForwarder();