import { parentPort, Worker } from 'worker_threads'; import redis from '../../redis/index.js'; import initQueue from '../../redis/initQueue.js'; // Redis key 定义(与 mdWebSocketServer 保持一致) const REDIS_KEYS = { CAPACITY: `${process.env.PROJECT_PREFIX}:md:capacity`, JWT: `${process.env.PROJECT_PREFIX}:md:jwt` }; // 日志工具函数 const logger = { info: (message) => { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] INFO: ${message}`); }, error: (message, error) => { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR: ${message}`, error || ''); }, debug: (message) => { const timestamp = new Date().toISOString(); console.debug(`[${timestamp}] DEBUG: ${message}`); } }; /** * 从 Redis 读取内部算力信息 * @returns {Promise} 内部可用算力 */ async function getInternalCapacityFromRedis() { try { const capacity = await redis.get(REDIS_KEYS.CAPACITY); console.log(`[MDWebSocketServer] 从 Redis 读取算力信息: ${capacity}`); return capacity ? parseInt(capacity, 10) : 0; } catch (error) { logger.error('从 Redis 读取算力信息失败:', error); return 0; } } /** * 从 Redis 读取 JWT Token * @returns {Promise} JWT Token */ async function getJwtTokenFromRedis() { try { const token = await redis.get(REDIS_KEYS.JWT); console.log(`[MDWebSocketServer] 从 Redis 读取 JWT Token: ${token ? '存在' : '不存在'}`); return token; } catch (error) { logger.error('从 Redis 读取 JWT Token 失败:', error); return null; } } /** * 分发状态管理器 * 用于跟踪 comfyui 任务在内部算力和外部算力之间的分配 */ class DispatchStateManager { constructor() { this.internalCapacity = 0; this.assignedToInternal = 0; this.hasJwtToken = false; } async init() { this.internalCapacity = await getInternalCapacityFromRedis(); const jwtToken = await getJwtTokenFromRedis(); this.hasJwtToken = !!jwtToken; this.assignedToInternal = 0; logger.info(`[DispatchStateManager] 初始化: 内部可用算力=${this.internalCapacity}, JWT=${this.hasJwtToken ? '存在' : '不存在'}`); } getDispatchType(platformName) { if (platformName !== 'comfyui') { return null; } if (this.hasJwtToken && this.assignedToInternal < this.internalCapacity) { this.assignedToInternal++; logger.debug(`[DispatchStateManager] 分配到 messageDispatcher (已分配 ${this.assignedToInternal}/${this.internalCapacity})`); return 'messageDispatcher'; } logger.debug(`[DispatchStateManager] 分配到 runninghub (内部已满或无JWT)`); return 'runninghub'; } } const dispatchStateManager = new DispatchStateManager(); // 创建专门的线程池管理 Worker const generateWorker = new Worker(new URL('./GenerateWorkerManager.js', import.meta.url)); // 判断并发数,获取可进行任务处理的等待队列 async function judgConcurrency() { try { // 获取平台相关信息包括等待队列名与并发数,当前任务数等 const platforms = await initQueue.getPlatforms(); // 储存可进行任务处理的等待队列 const wDeficiency = []; logger.debug('获取到的平台信息:', platforms); // 获取内部可用算力(用于 comfyui 平台) const internalCapacity = await getInternalCapacityFromRedis(); const jwtToken = await getJwtTokenFromRedis(); const hasInternalCapacity = internalCapacity > 0 && !!jwtToken; if (hasInternalCapacity) { logger.info(`[judgConcurrency] 内部可用算力: ${internalCapacity}, JWT: 存在`); } // 检查每个平台的实际队列长度 for(const [aigcPfName, info] of Object.entries(platforms)) { try { // 直接检查 Redis 队列的实际长度 const actualQueueLength = await redis.lLen(info.waitQueue); // logger.debug(`平台 ${aigcPfName} 信息:PQtasks=${info.PQtasks}, MAX_CONCURRENT=${info.MAX_CONCURRENT}, 实际队列长度=${actualQueueLength}`); // 计算总并发能力 let totalCapacity = info.MAX_CONCURRENT; // 对于 comfyui 平台,如果有内部算力,增加可处理任务数 if (info.platformName === 'comfyui' && hasInternalCapacity) { totalCapacity = info.MAX_CONCURRENT + internalCapacity; logger.debug(`[judgConcurrency] comfyui 平台总并发: ${totalCapacity} (外部${info.MAX_CONCURRENT} + 内部${internalCapacity})`); } // 判断是否可以处理任务:总并发未满且队列中有任务 if (info.PQtasks < totalCapacity && actualQueueLength > 0) { let count = totalCapacity - info.PQtasks; // 可处理的任务数不能大于队列实际长度 if(count > actualQueueLength) { count = actualQueueLength; } wDeficiency.push({ aigcPfName, info, count }); // 储存可进行任务处理的等待队列 logger.debug(`平台 ${aigcPfName} 满足处理条件,可处理 ${count} 个任务`); } else { // logger.debug(`平台 ${aigcPfName} 不满足处理条件:PQtasks < MAX_CONCURRENT = ${info.PQtasks < info.MAX_CONCURRENT}, 队列长度 > 0 = ${actualQueueLength > 0}`); } } catch (error) { logger.error(`检查平台 ${aigcPfName} 队列长度失败:`, error); } } return wDeficiency; // 返回可处理的队列列表 } catch (error) { logger.error('判断并发数失败:', error); return []; } } // 批量获取等待队列任务的任务ID(仅获取,不移除) async function getBatchWaitTasksID(platforms) { try { const multi = redis.multi(); // 从等待队列批量获取任务ID,但不立即移除 for(const platform of platforms) { multi.lRange(platform.info.waitQueue, 0, platform.count - 1); } // 执行所有命令 const results = await multi.exec(); console.log('批量获取等待队列任务ID结果:', results); // 将任务ID与处理队列关联 for(let i = 0; i < results.length; i++) { const taskIDs = results[i] || []; const platform = platforms[i]; platform.waitTaskID = taskIDs; } logger.debug('批量获取等待队列任务ID:', platforms); return platforms; } catch (error) { logger.error('批量获取等待队列任务ID失败:', error); return platforms; } } // 批量获取多个等待队列中的任务数据 async function getBatchWaitTasks(aigcPfTasks) { const tasksData = []; try { // 在处理任务前,初始化分发状态管理器(获取最新的内部算力信息) await dispatchStateManager.init(); // 收集所有需要获取的任务ID const allTaskIds = []; const taskIdMap = new Map(); // 用于映射任务ID到平台信息 for(const aigcPfTask of aigcPfTasks) { for(const taskId of aigcPfTask.waitTaskID) { if (taskId) { allTaskIds.push(taskId); taskIdMap.set(taskId, { platformName: aigcPfTask.info.platformName, aigc: aigcPfTask.info.AIGC, aigcPfName: aigcPfTask.aigcPfName }); } } } if (allTaskIds.length === 0) { return tasksData; } // 批量获取任务数据 const multi = redis.multi(); for(const taskId of allTaskIds) { multi.hGetAll(`${initQueue.prefix}:task:${taskId}`); } const results = await multi.exec(); // 处理结果 for(let i = 0; i < results.length; i++) { const taskInfo = results[i]; const taskId = allTaskIds[i]; const platformInfo = taskIdMap.get(taskId); if (taskInfo) { try { // 使用分发状态管理器获取分发类型(前 N 个用 messageDispatcher,剩余用 runninghub) const dispatchType = dispatchStateManager.getDispatchType(platformInfo.platformName); tasksData.push({ backendId: taskInfo.backendId, taskId: taskInfo.taskId, // 单个任务ID platformName: platformInfo.platformName, aigc: platformInfo.aigc, aigcPfName: platformInfo.aigcPfName, taskData: taskInfo.payload, workflowId: taskInfo.workflowId || '', dispatchType: dispatchType, }); // logger.debug(`已获取任务 ${taskId} 数据:platform=${platformInfo.platformName}, aigc=${platformInfo.aigc}`); } catch (error) { logger.error(`解析任务${taskId}数据失败:`, error); } } else { logger.warn(`任务 ${taskId} 数据不存在`); } } // logger.debug('批量获取多个等待队列中的任务数据:', tasksData); return tasksData; } catch (error) { logger.error('批量获取任务数据失败:', error); return tasksData; } } // 批量移除任务ID、更新等待队列计数并增加处理队列任务数 async function updateTaskCounts(wDeficiency) { try { const taskCountMap = new Map(); const multi = redis.multi(); // 1. 准备批量移除任务ID和更新计数 for(const aigcPfTask of wDeficiency) { const { waitTaskID, info } = aigcPfTask; const key = aigcPfTask.aigcPfName; const count = waitTaskID.length; if (count > 0) { // 移除已处理的任务ID multi.lTrim(info.waitQueue, count, -1); // 更新计数映射 if(taskCountMap.has(key)) { taskCountMap.set(key, taskCountMap.get(key) + count); } else { taskCountMap.set(key, count); } } } // 2. 执行Redis命令,移除任务ID await multi.exec(); // 3. 更新平台计数 if (taskCountMap.size > 0) { // 减少平台等待队列的待处理任务数 await initQueue.reducePlatformsWait(taskCountMap); // 增加平台处理队列的正在处理任务数 await initQueue.addPlatformsProcess(taskCountMap); logger.debug('更新任务计数完成'); } } catch (error) { logger.error('更新任务计数失败:', error); } } // 持续执行批量处理 (async () => { while(true) { try { // 判断并发数,获取可进行任务处理的等待队列 const wDeficiency = await judgConcurrency(); // 判断是否有可处理的队列 if(wDeficiency.length > 0) { // logger.debug('可进行任务处理的等待队列:', wDeficiency); logger.info('有可进行处理的队列,数量: ' + wDeficiency.length); console.log(wDeficiency); // 批量获取多个等待队列中的任务ID const tasksWithIds = await getBatchWaitTasksID(wDeficiency); // 通过任务ID批量获取多个等待队列中的任务数据 const tasksData = await getBatchWaitTasks(tasksWithIds); // 更新任务计数 - 无论是否获取到任务数据,都需要更新计数,因为任务ID已经从Redis队列中移除 await updateTaskCounts(tasksWithIds); // 将任务发送给生成 Worker 处理 if (tasksData.length > 0) { logger.info('发送任务给生成Worker处理,数量: ' + tasksData.length); generateWorker.postMessage(tasksData); } } else { // 没有可处理的队列,等待10秒后重试 await new Promise(resolve => setTimeout(resolve, 15000)); logger.debug('没有可处理的队列'); } } catch (error) { logger.error('批量处理任务失败:', error); // 出错后等待5秒再重试 await new Promise(resolve => setTimeout(resolve, 5000)); } } })(); // 监听 generateWorker 的消息 generateWorker.on('message', (message) => { if (message.status === 'completed') { logger.debug('等待任务处理完成'); } }); generateWorker.on('error', (error) => { logger.error('生成 Worker 错误:', error); // 可以考虑重启Worker });