import redis from './index.js'; import { modelData, platformData } from '../config/Config.js'; import dotenv from 'dotenv'; dotenv.config(); // 日志工具函数 const logger = { info: (message) => { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] INFO: ${message}`); }, error: (message, error) => { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR: ${message}`, error || ''); }, debug: (message) => { const timestamp = new Date().toISOString(); console.debug(`[${timestamp}] DEBUG: ${message}`); } }; class InitQueues { constructor() { this.prefix = process.env.PROJECT_PREFIX || 'default'; this.processPolling = `${this.prefix}:process:Polling`; // 轮询处理队列名 {remoteTaskId: "JSON.stringify{taskid, platform, AIGC}"} this.processCallback = `${this.prefix}:process:callback`; // 回调队列名 this.resultName = `${this.prefix}:result:queue`; // 结果队列名 { taskid:"JSON.stringify(result)" } this.resultList = `${this.prefix}:result:list`; // 结果列表,存储任务ID this.callback = `${this.prefix}:callback`; // 回调队列名 this.errorName = `${this.prefix}:error:queue`; // 错误队列名 this.errorList = `${this.prefix}:error:list`; // 错误列表,存储任务ID this.initInfoKey = `${this.prefix}:InitInfo`; // 初始化信息键名 this.pendingMessages = `${this.prefix}:pending:messages`; // 待发送消息队列,存储断连时未发送的消息 } // 初始化各个队列 async init(){ logger.info('开始初始化队列...'); const waitQueues = []; // 存储等待队列名称 const platforms = {}; // 存储平台相关信息 try { // 初始化各个平台的等待队列 for (const [AIGC, modelObj] of Object.entries(modelData)) { for (const [platformName, info] of Object.entries(modelObj)) { // 初始化等待队列名称,不需要实际创建空队列(Redis会自动创建) const waitName = `${AIGC}:${platformName}:wait`; waitQueues.push(waitName); logger.debug(`等待队列创建成功: ${waitName}`); // 记录各平台信息 const platform = { AIGC, platformName, WQtasks: 0, PQtasks: 0, MAX_CONCURRENT: info.concurrency, // 最大并发数 waitQueue: waitName }; platforms[`${AIGC}:${platformName}`] = platform; } } // 检查InitInfo是否已存在 const existingInfo = await redis.json.get(this.initInfoKey, { path: '$' }); if (!existingInfo) { // 初始化配置信息,只需要设置一次 const initInfo = { waitQueues: waitQueues, processPolling: this.processPolling, processCallback: this.processCallback, resultName: this.resultName, PQtasksALL: 0, // 初始值为0 RQtasksALL: 0, CQtasksALL: 0, EQtaskALL: 0, platforms: platforms }; await redis.json.set(this.initInfoKey, '$', initInfo); logger.info('Redis初始化完成,创建了InitInfo配置'); } else { logger.info('Redis已存在初始化信息,检查并更新配置'); // 清理等待队列中的旧任务ID for (const waitName of waitQueues) { const queueLength = await redis.lLen(waitName); if (queueLength > 0) { await redis.del(waitName); logger.info(`已清理等待队列 ${waitName},删除了 ${queueLength} 个旧任务`); } } // 更新各平台配置,确保MAX_CONCURRENT和其他属性正确设置 for (const [key, platform] of Object.entries(existingInfo[0].platforms)) { // 从新生成的platforms中获取最新配置 const newPlatformConfig = platforms[key]; if (newPlatformConfig) { // 更新平台配置,包括MAX_CONCURRENT await redis.json.set(this.initInfoKey, `$.platforms.${key}`, { ...platform, MAX_CONCURRENT: newPlatformConfig.MAX_CONCURRENT, WQtasks: 0, // 清零等待队列任务数 waitQueue: newPlatformConfig.waitQueue }); logger.debug(`已更新平台 ${key} 配置:MAX_CONCURRENT=${newPlatformConfig.MAX_CONCURRENT}`); } } logger.info('已更新各平台配置并清零WQtasks计数器'); } logger.info('队列初始化完成'); } catch (error) { logger.error('队列初始化失败:', error); throw error; // 抛出错误以便上层处理 } } // // 加载现有配置(用于Worker线程) // async loadExistingConfig() { // // 重新加载平台信息 // waitQueues = []; // platforms.clear(); // for (const [AIGC, modelObj] of Object.entries(modelData)) { // for (const [KEY, info] of Object.entries(modelObj)) { // const platformName = KEY; // const waitName = this.toQueue(AIGC, platformName, 'wait'); // waitQueues.push(waitName); // // 从Redis获取当前任务数信息,这里简化处理 // const platform = { // AIGC, // platformName, // WQtasks: 0, // PQtasks: 0, // MAX_CONCURRENT: info.concurrency, // waitQueue: waitName // } // platforms.set(AIGC + ':' + platformName, platform) // } // } // } // 获取队列名称 toQueue(AIGC, model, queue) { return `${AIGC}:${model}:${queue}`; } // // 获取等待队列名称 // async getWaitQueueNames(){ // const res = await redis.json.get("InitInfo", { path: '$.waitQueues' }) // return res[0] // } // // 获取结果队列名称 // async getResultName(){ // const res = await redis.json.get("InitInfo", { path: '$.resultName' }) // return res[0] // } // 获取平台相关信息 async getPlatforms() { try { const res = await redis.json.get(this.initInfoKey, { path: '$.platforms' }); return res ? res[0] : {}; } catch (error) { logger.error('获取平台信息失败:', error); return {}; } } // 获取轮询的处理队列总任务数 async getPQtasksALL() { try { const res = await redis.json.get(this.initInfoKey, { path: '$.PQtasksALL' }); return res ? res[0] : 0; } catch (error) { logger.error('获取轮询处理队列总任务数失败:', error); return 0; } } // 获取结果队列任务数 async getRQtasksALL() { try { const res = await redis.json.get(this.initInfoKey, { path: '$.RQtasksALL' }); return res ? res[0] : 0; } catch (error) { logger.error('获取结果队列任务数失败:', error); return 0; } } // 获取回调队列任务数 async getCQtasksALL() { try { const res = await redis.json.get(this.initInfoKey, { path: '$.CQtasksALL' }); return res ? res[0] : 0; } catch (error) { logger.error('获取回调队列任务数失败:', error); return 0; } } // 获取错误队列任务数 async getEQtaskALL() { try { const res = await redis.json.get(this.initInfoKey, { path: '$.EQtaskALL' }); return res ? res[0] : 0; } catch (error) { logger.error('获取错误队列任务数失败:', error); return 0; } } // 增加平台相关信息处理队列 正在处理的任务数 async addPlatformsProcess(taskCountMap) { try { const multi = redis.multi(); let PQcount = 0; for (const [key, count] of taskCountMap.entries()) { const [aigc, platform] = key.split(':'); multi.json.numIncrBy(this.initInfoKey, `$.platforms.${key}.PQtasks`, count); if (platformData.polling.includes(platform)) { PQcount++; } logger.debug(`增加相关平台处理队列: AIGC=${aigc}, Platform=${platform}, Count=${count}`); } multi.json.numIncrBy(this.initInfoKey, '$.PQtasksALL', PQcount); await multi.exec(); // 等待multi命令执行完成 } catch (error) { logger.error('增加平台处理队列任务数失败:', error); } } // 减少单个平台处理队列任务数(带边界检查) async reducePlatformsProcessSingle(platformKey) { const key = `${this.prefix}:platforms:${platformKey}`; try { const platformInfo = await redis.json.get(this.initInfoKey, { path: `$.platforms.${platformKey}` }); if (!platformInfo || !platformInfo[0]) { logger.warn(`[CapacityManager] 平台不存在: ${platformKey}`); return 0; } const current = platformInfo[0].PQtasks; let newValue = parseInt(current) - 1; if (newValue < 0) { logger.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`); newValue = 0; } await redis.json.set(this.initInfoKey, `$.platforms.${platformKey}.PQtasks`, newValue); logger.debug(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`); return newValue; } catch (error) { logger.error(`[CapacityManager] 更新 PQtasks 失败:`, error); throw error; } } // 减少平台相关信息处理队列 正在处理的任务数 async reducePlatformsProcess(taskCountMap) { try { const multi = redis.multi(); let PQcount = 0; for (const [key, count] of taskCountMap.entries()) { const [aigc, platform] = key.split(':'); const platformKey = key; const platformInfo = await redis.json.get(this.initInfoKey, { path: `$.platforms.${platformKey}` }); if (platformInfo && platformInfo[0]) { const current = platformInfo[0].PQtasks; let newValue = parseInt(current) - count; if (newValue < 0) { logger.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`); newValue = 0; } multi.json.set(this.initInfoKey, `$.platforms.${platformKey}.PQtasks`, newValue); logger.debug(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`); } if (platformData.polling.includes(platform)) { PQcount++; } logger.debug(`减少相关平台处理队列: AIGC=${aigc}, Platform=${platform}, Count=${count}`); } multi.json.numIncrBy(this.initInfoKey, '$.PQtasksALL', -PQcount); multi.json.numIncrBy(this.initInfoKey, '$.RQtasksALL', PQcount); await multi.exec(); } catch (error) { logger.error('减少平台处理队列任务数失败:', error); } } // 增加平台等待队列的 等待中任务数 async addPlatformsWait(aigc, platform, count) { try { await redis.json.numIncrBy(this.initInfoKey, `$.platforms.${aigc}:${platform}.WQtasks`, count); logger.debug(`增加平台等待队列任务数: AIGC=${aigc}, Platform=${platform}, Count=${count}`); } catch (error) { logger.error('增加平台等待队列任务数失败:', error); } } // 减少平台等待队列的 等待中任务数 async reducePlatformsWait(taskCountMap) { try { const multi = redis.multi(); for (const [key, count] of taskCountMap.entries()) { multi.json.numIncrBy(this.initInfoKey, `$.platforms.${key}.WQtasks`, -count); } await multi.exec(); // 等待multi命令执行完成 } catch (error) { logger.error('减少平台等待队列任务数失败:', error); } } // 增加回调队列的 完成任务数 async addCallbackRQtasks(count) { try { await redis.json.numIncrBy(this.initInfoKey, '$.CQtasksALL', count); logger.debug(`增加回调队列任务数: ${count}`); } catch (error) { logger.error('增加回调队列任务数失败:', error); } } // 减少回调队列任务数 async reduceCQtasksALL(count) { try { await redis.json.numIncrBy(this.initInfoKey, '$.CQtasksALL', -count); logger.debug(`减少回调队列任务数: ${count}`); } catch (error) { logger.error('减少回调队列任务数失败:', error); } } // 增加错误队列的 完成任务数 async addEQtaskALL(count) { try { await redis.json.numIncrBy(this.initInfoKey, '$.EQtaskALL', count); logger.debug(`增加错误队列任务数: ${count}`); } catch (error) { logger.error('增加错误队列任务数失败:', error); } } // 减少错误队列任务数 async reduceEQtaskALL(count) { try { await redis.json.numIncrBy(this.initInfoKey, '$.EQtaskALL', -count); logger.debug(`减少错误队列任务数: ${count}`); } catch (error) { logger.error('减少错误队列任务数失败:', error); } } } export default new InitQueues();