378 lines
13 KiB
JavaScript
378 lines
13 KiB
JavaScript
import redis from './index.js';
|
||
import { modelData, platformData } from '../config/Config.js';
|
||
import dotenv from 'dotenv';
|
||
|
||
dotenv.config();
|
||
|
||
// 日志工具函数
|
||
const logger = {
|
||
info: (message) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.log(`[${timestamp}] INFO: ${message}`);
|
||
},
|
||
error: (message, error) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
|
||
},
|
||
debug: (message) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.debug(`[${timestamp}] DEBUG: ${message}`);
|
||
}
|
||
};
|
||
|
||
class InitQueues {
|
||
constructor() {
|
||
this.prefix = process.env.PROJECT_PREFIX || 'default';
|
||
this.processPolling = `${this.prefix}:process:Polling`; // 轮询处理队列名 {remoteTaskId: "JSON.stringify{taskid, platform, AIGC}"}
|
||
this.processCallback = `${this.prefix}:process:callback`; // 回调队列名
|
||
this.resultName = `${this.prefix}:result:queue`; // 结果队列名 { taskid:"JSON.stringify(result)" }
|
||
this.resultList = `${this.prefix}:result:list`; // 结果列表,存储任务ID
|
||
this.callback = `${this.prefix}:callback`; // 回调队列名
|
||
this.errorName = `${this.prefix}:error:queue`; // 错误队列名
|
||
this.errorList = `${this.prefix}:error:list`; // 错误列表,存储任务ID
|
||
this.initInfoKey = `${this.prefix}:InitInfo`; // 初始化信息键名
|
||
this.pendingMessages = `${this.prefix}:pending:messages`; // 待发送消息队列,存储断连时未发送的消息
|
||
}
|
||
|
||
// 初始化各个队列
|
||
async init(){
|
||
logger.info('开始初始化队列...');
|
||
|
||
const waitQueues = []; // 存储等待队列名称
|
||
const platforms = {}; // 存储平台相关信息
|
||
|
||
try {
|
||
// 初始化各个平台的等待队列
|
||
for (const [AIGC, modelObj] of Object.entries(modelData)) {
|
||
for (const [platformName, info] of Object.entries(modelObj)) {
|
||
// 初始化等待队列名称,不需要实际创建空队列(Redis会自动创建)
|
||
const waitName = `${AIGC}:${platformName}:wait`;
|
||
waitQueues.push(waitName);
|
||
logger.debug(`等待队列创建成功: ${waitName}`);
|
||
|
||
// 记录各平台信息
|
||
const platform = {
|
||
AIGC,
|
||
platformName,
|
||
WQtasks: 0,
|
||
PQtasks: 0,
|
||
MAX_CONCURRENT: info.concurrency, // 最大并发数
|
||
waitQueue: waitName
|
||
};
|
||
platforms[`${AIGC}:${platformName}`] = platform;
|
||
}
|
||
}
|
||
|
||
// 检查InitInfo是否已存在
|
||
const existingInfo = await redis.json.get(this.initInfoKey, { path: '$' });
|
||
if (!existingInfo) {
|
||
// 初始化配置信息,只需要设置一次
|
||
const initInfo = {
|
||
waitQueues: waitQueues,
|
||
processPolling: this.processPolling,
|
||
processCallback: this.processCallback,
|
||
resultName: this.resultName,
|
||
PQtasksALL: 0, // 初始值为0
|
||
RQtasksALL: 0,
|
||
CQtasksALL: 0,
|
||
EQtaskALL: 0,
|
||
platforms: platforms
|
||
};
|
||
|
||
await redis.json.set(this.initInfoKey, '$', initInfo);
|
||
logger.info('Redis初始化完成,创建了InitInfo配置');
|
||
} else {
|
||
logger.info('Redis已存在初始化信息,检查并更新配置');
|
||
|
||
// 清理等待队列中的旧任务ID
|
||
for (const waitName of waitQueues) {
|
||
const queueLength = await redis.lLen(waitName);
|
||
if (queueLength > 0) {
|
||
await redis.del(waitName);
|
||
logger.info(`已清理等待队列 ${waitName},删除了 ${queueLength} 个旧任务`);
|
||
}
|
||
}
|
||
|
||
// 更新各平台配置,确保MAX_CONCURRENT和其他属性正确设置
|
||
for (const [key, platform] of Object.entries(existingInfo[0].platforms)) {
|
||
// 从新生成的platforms中获取最新配置
|
||
const newPlatformConfig = platforms[key];
|
||
if (newPlatformConfig) {
|
||
// 更新平台配置,包括MAX_CONCURRENT
|
||
await redis.json.set(this.initInfoKey, `$.platforms.${key}`, {
|
||
...platform,
|
||
MAX_CONCURRENT: newPlatformConfig.MAX_CONCURRENT,
|
||
WQtasks: 0, // 清零等待队列任务数
|
||
waitQueue: newPlatformConfig.waitQueue
|
||
});
|
||
logger.debug(`已更新平台 ${key} 配置:MAX_CONCURRENT=${newPlatformConfig.MAX_CONCURRENT}`);
|
||
}
|
||
}
|
||
|
||
logger.info('已更新各平台配置并清零WQtasks计数器');
|
||
}
|
||
|
||
logger.info('队列初始化完成');
|
||
} catch (error) {
|
||
logger.error('队列初始化失败:', error);
|
||
throw error; // 抛出错误以便上层处理
|
||
}
|
||
}
|
||
|
||
// // 加载现有配置(用于Worker线程)
|
||
// async loadExistingConfig() {
|
||
// // 重新加载平台信息
|
||
// waitQueues = [];
|
||
// platforms.clear();
|
||
|
||
// for (const [AIGC, modelObj] of Object.entries(modelData)) {
|
||
// for (const [KEY, info] of Object.entries(modelObj)) {
|
||
// const platformName = KEY;
|
||
// const waitName = this.toQueue(AIGC, platformName, 'wait');
|
||
// waitQueues.push(waitName);
|
||
|
||
// // 从Redis获取当前任务数信息,这里简化处理
|
||
// const platform = {
|
||
// AIGC,
|
||
// platformName,
|
||
// WQtasks: 0,
|
||
// PQtasks: 0,
|
||
// MAX_CONCURRENT: info.concurrency,
|
||
// waitQueue: waitName
|
||
// }
|
||
// platforms.set(AIGC + ':' + platformName, platform)
|
||
// }
|
||
// }
|
||
// }
|
||
|
||
// 获取队列名称
|
||
toQueue(AIGC, model, queue) {
|
||
return `${AIGC}:${model}:${queue}`;
|
||
}
|
||
|
||
// // 获取等待队列名称
|
||
// async getWaitQueueNames(){
|
||
// const res = await redis.json.get("InitInfo", { path: '$.waitQueues' })
|
||
// return res[0]
|
||
// }
|
||
|
||
// // 获取结果队列名称
|
||
// async getResultName(){
|
||
// const res = await redis.json.get("InitInfo", { path: '$.resultName' })
|
||
// return res[0]
|
||
// }
|
||
|
||
// 获取平台相关信息
|
||
async getPlatforms() {
|
||
try {
|
||
const res = await redis.json.get(this.initInfoKey, { path: '$.platforms' });
|
||
return res ? res[0] : {};
|
||
} catch (error) {
|
||
logger.error('获取平台信息失败:', error);
|
||
return {};
|
||
}
|
||
}
|
||
|
||
// 获取轮询的处理队列总任务数
|
||
async getPQtasksALL() {
|
||
try {
|
||
const res = await redis.json.get(this.initInfoKey, { path: '$.PQtasksALL' });
|
||
return res ? res[0] : 0;
|
||
} catch (error) {
|
||
logger.error('获取轮询处理队列总任务数失败:', error);
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
// 获取结果队列任务数
|
||
async getRQtasksALL() {
|
||
try {
|
||
const res = await redis.json.get(this.initInfoKey, { path: '$.RQtasksALL' });
|
||
return res ? res[0] : 0;
|
||
} catch (error) {
|
||
logger.error('获取结果队列任务数失败:', error);
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
// 获取回调队列任务数
|
||
async getCQtasksALL() {
|
||
try {
|
||
const res = await redis.json.get(this.initInfoKey, { path: '$.CQtasksALL' });
|
||
return res ? res[0] : 0;
|
||
} catch (error) {
|
||
logger.error('获取回调队列任务数失败:', error);
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
// 获取错误队列任务数
|
||
async getEQtaskALL() {
|
||
try {
|
||
const res = await redis.json.get(this.initInfoKey, { path: '$.EQtaskALL' });
|
||
return res ? res[0] : 0;
|
||
} catch (error) {
|
||
logger.error('获取错误队列任务数失败:', error);
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
// 增加平台相关信息处理队列 正在处理的任务数
|
||
async addPlatformsProcess(taskCountMap) {
|
||
try {
|
||
const multi = redis.multi();
|
||
let PQcount = 0;
|
||
|
||
for (const [key, count] of taskCountMap.entries()) {
|
||
const [aigc, platform] = key.split(':');
|
||
multi.json.numIncrBy(this.initInfoKey, `$.platforms.${key}.PQtasks`, count);
|
||
|
||
if (platformData.polling.includes(platform)) {
|
||
PQcount++;
|
||
}
|
||
logger.debug(`增加相关平台处理队列: AIGC=${aigc}, Platform=${platform}, Count=${count}`);
|
||
}
|
||
|
||
multi.json.numIncrBy(this.initInfoKey, '$.PQtasksALL', PQcount);
|
||
await multi.exec(); // 等待multi命令执行完成
|
||
} catch (error) {
|
||
logger.error('增加平台处理队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 减少单个平台处理队列任务数(带边界检查)
|
||
async reducePlatformsProcessSingle(platformKey) {
|
||
const key = `${this.prefix}:platforms:${platformKey}`;
|
||
|
||
try {
|
||
const platformInfo = await redis.json.get(this.initInfoKey, { path: `$.platforms.${platformKey}` });
|
||
if (!platformInfo || !platformInfo[0]) {
|
||
logger.warn(`[CapacityManager] 平台不存在: ${platformKey}`);
|
||
return 0;
|
||
}
|
||
|
||
const current = platformInfo[0].PQtasks;
|
||
let newValue = parseInt(current) - 1;
|
||
|
||
if (newValue < 0) {
|
||
logger.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`);
|
||
newValue = 0;
|
||
}
|
||
|
||
await redis.json.set(this.initInfoKey, `$.platforms.${platformKey}.PQtasks`, newValue);
|
||
logger.debug(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`);
|
||
|
||
return newValue;
|
||
} catch (error) {
|
||
logger.error(`[CapacityManager] 更新 PQtasks 失败:`, error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
// 减少平台相关信息处理队列 正在处理的任务数
|
||
async reducePlatformsProcess(taskCountMap) {
|
||
try {
|
||
const multi = redis.multi();
|
||
let PQcount = 0;
|
||
|
||
for (const [key, count] of taskCountMap.entries()) {
|
||
const [aigc, platform] = key.split(':');
|
||
|
||
const platformKey = key;
|
||
const platformInfo = await redis.json.get(this.initInfoKey, { path: `$.platforms.${platformKey}` });
|
||
if (platformInfo && platformInfo[0]) {
|
||
const current = platformInfo[0].PQtasks;
|
||
let newValue = parseInt(current) - count;
|
||
|
||
if (newValue < 0) {
|
||
logger.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`);
|
||
newValue = 0;
|
||
}
|
||
|
||
multi.json.set(this.initInfoKey, `$.platforms.${platformKey}.PQtasks`, newValue);
|
||
logger.debug(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`);
|
||
}
|
||
|
||
if (platformData.polling.includes(platform)) {
|
||
PQcount++;
|
||
}
|
||
logger.debug(`减少相关平台处理队列: AIGC=${aigc}, Platform=${platform}, Count=${count}`);
|
||
}
|
||
|
||
multi.json.numIncrBy(this.initInfoKey, '$.PQtasksALL', -PQcount);
|
||
multi.json.numIncrBy(this.initInfoKey, '$.RQtasksALL', PQcount);
|
||
await multi.exec();
|
||
} catch (error) {
|
||
logger.error('减少平台处理队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 增加平台等待队列的 等待中任务数
|
||
async addPlatformsWait(aigc, platform, count) {
|
||
try {
|
||
await redis.json.numIncrBy(this.initInfoKey, `$.platforms.${aigc}:${platform}.WQtasks`, count);
|
||
logger.debug(`增加平台等待队列任务数: AIGC=${aigc}, Platform=${platform}, Count=${count}`);
|
||
} catch (error) {
|
||
logger.error('增加平台等待队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 减少平台等待队列的 等待中任务数
|
||
async reducePlatformsWait(taskCountMap) {
|
||
try {
|
||
const multi = redis.multi();
|
||
|
||
for (const [key, count] of taskCountMap.entries()) {
|
||
multi.json.numIncrBy(this.initInfoKey, `$.platforms.${key}.WQtasks`, -count);
|
||
}
|
||
|
||
await multi.exec(); // 等待multi命令执行完成
|
||
} catch (error) {
|
||
logger.error('减少平台等待队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 增加回调队列的 完成任务数
|
||
async addCallbackRQtasks(count) {
|
||
try {
|
||
await redis.json.numIncrBy(this.initInfoKey, '$.CQtasksALL', count);
|
||
logger.debug(`增加回调队列任务数: ${count}`);
|
||
} catch (error) {
|
||
logger.error('增加回调队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 减少回调队列任务数
|
||
async reduceCQtasksALL(count) {
|
||
try {
|
||
await redis.json.numIncrBy(this.initInfoKey, '$.CQtasksALL', -count);
|
||
logger.debug(`减少回调队列任务数: ${count}`);
|
||
} catch (error) {
|
||
logger.error('减少回调队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 增加错误队列的 完成任务数
|
||
async addEQtaskALL(count) {
|
||
try {
|
||
await redis.json.numIncrBy(this.initInfoKey, '$.EQtaskALL', count);
|
||
logger.debug(`增加错误队列任务数: ${count}`);
|
||
} catch (error) {
|
||
logger.error('增加错误队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
// 减少错误队列任务数
|
||
async reduceEQtaskALL(count) {
|
||
try {
|
||
await redis.json.numIncrBy(this.initInfoKey, '$.EQtaskALL', -count);
|
||
logger.debug(`减少错误队列任务数: ${count}`);
|
||
} catch (error) {
|
||
logger.error('减少错误队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
export default new InitQueues();
|