shuzhiren-comfyui/任务队列后端/redis/initQueue.js

378 lines
13 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import redis from './index.js';
import { modelData, platformData } from '../config/Config.js';
import dotenv from 'dotenv';
dotenv.config();
// 日志工具函数
const logger = {
info: (message) => {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] INFO: ${message}`);
},
error: (message, error) => {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
},
debug: (message) => {
const timestamp = new Date().toISOString();
console.debug(`[${timestamp}] DEBUG: ${message}`);
}
};
class InitQueues {
constructor() {
this.prefix = process.env.PROJECT_PREFIX || 'default';
this.processPolling = `${this.prefix}:process:Polling`; // 轮询处理队列名 {remoteTaskId: "JSON.stringify{taskid, platform, AIGC}"}
this.processCallback = `${this.prefix}:process:callback`; // 回调队列名
this.resultName = `${this.prefix}:result:queue`; // 结果队列名 { taskid:"JSON.stringify(result)" }
this.resultList = `${this.prefix}:result:list`; // 结果列表存储任务ID
this.callback = `${this.prefix}:callback`; // 回调队列名
this.errorName = `${this.prefix}:error:queue`; // 错误队列名
this.errorList = `${this.prefix}:error:list`; // 错误列表存储任务ID
this.initInfoKey = `${this.prefix}:InitInfo`; // 初始化信息键名
this.pendingMessages = `${this.prefix}:pending:messages`; // 待发送消息队列,存储断连时未发送的消息
}
// 初始化各个队列
async init(){
logger.info('开始初始化队列...');
const waitQueues = []; // 存储等待队列名称
const platforms = {}; // 存储平台相关信息
try {
// 初始化各个平台的等待队列
for (const [AIGC, modelObj] of Object.entries(modelData)) {
for (const [platformName, info] of Object.entries(modelObj)) {
// 初始化等待队列名称不需要实际创建空队列Redis会自动创建
const waitName = `${AIGC}:${platformName}:wait`;
waitQueues.push(waitName);
logger.debug(`等待队列创建成功: ${waitName}`);
// 记录各平台信息
const platform = {
AIGC,
platformName,
WQtasks: 0,
PQtasks: 0,
MAX_CONCURRENT: info.concurrency, // 最大并发数
waitQueue: waitName
};
platforms[`${AIGC}:${platformName}`] = platform;
}
}
// 检查InitInfo是否已存在
const existingInfo = await redis.json.get(this.initInfoKey, { path: '$' });
if (!existingInfo) {
// 初始化配置信息,只需要设置一次
const initInfo = {
waitQueues: waitQueues,
processPolling: this.processPolling,
processCallback: this.processCallback,
resultName: this.resultName,
PQtasksALL: 0, // 初始值为0
RQtasksALL: 0,
CQtasksALL: 0,
EQtaskALL: 0,
platforms: platforms
};
await redis.json.set(this.initInfoKey, '$', initInfo);
logger.info('Redis初始化完成创建了InitInfo配置');
} else {
logger.info('Redis已存在初始化信息检查并更新配置');
// 清理等待队列中的旧任务ID
for (const waitName of waitQueues) {
const queueLength = await redis.lLen(waitName);
if (queueLength > 0) {
await redis.del(waitName);
logger.info(`已清理等待队列 ${waitName},删除了 ${queueLength} 个旧任务`);
}
}
// 更新各平台配置确保MAX_CONCURRENT和其他属性正确设置
for (const [key, platform] of Object.entries(existingInfo[0].platforms)) {
// 从新生成的platforms中获取最新配置
const newPlatformConfig = platforms[key];
if (newPlatformConfig) {
// 更新平台配置包括MAX_CONCURRENT
await redis.json.set(this.initInfoKey, `$.platforms.${key}`, {
...platform,
MAX_CONCURRENT: newPlatformConfig.MAX_CONCURRENT,
WQtasks: 0, // 清零等待队列任务数
waitQueue: newPlatformConfig.waitQueue
});
logger.debug(`已更新平台 ${key} 配置MAX_CONCURRENT=${newPlatformConfig.MAX_CONCURRENT}`);
}
}
logger.info('已更新各平台配置并清零WQtasks计数器');
}
logger.info('队列初始化完成');
} catch (error) {
logger.error('队列初始化失败:', error);
throw error; // 抛出错误以便上层处理
}
}
// // 加载现有配置用于Worker线程
// async loadExistingConfig() {
// // 重新加载平台信息
// waitQueues = [];
// platforms.clear();
// for (const [AIGC, modelObj] of Object.entries(modelData)) {
// for (const [KEY, info] of Object.entries(modelObj)) {
// const platformName = KEY;
// const waitName = this.toQueue(AIGC, platformName, 'wait');
// waitQueues.push(waitName);
// // 从Redis获取当前任务数信息这里简化处理
// const platform = {
// AIGC,
// platformName,
// WQtasks: 0,
// PQtasks: 0,
// MAX_CONCURRENT: info.concurrency,
// waitQueue: waitName
// }
// platforms.set(AIGC + ':' + platformName, platform)
// }
// }
// }
// 获取队列名称
toQueue(AIGC, model, queue) {
return `${AIGC}:${model}:${queue}`;
}
// // 获取等待队列名称
// async getWaitQueueNames(){
// const res = await redis.json.get("InitInfo", { path: '$.waitQueues' })
// return res[0]
// }
// // 获取结果队列名称
// async getResultName(){
// const res = await redis.json.get("InitInfo", { path: '$.resultName' })
// return res[0]
// }
// 获取平台相关信息
async getPlatforms() {
try {
const res = await redis.json.get(this.initInfoKey, { path: '$.platforms' });
return res ? res[0] : {};
} catch (error) {
logger.error('获取平台信息失败:', error);
return {};
}
}
// 获取轮询的处理队列总任务数
async getPQtasksALL() {
try {
const res = await redis.json.get(this.initInfoKey, { path: '$.PQtasksALL' });
return res ? res[0] : 0;
} catch (error) {
logger.error('获取轮询处理队列总任务数失败:', error);
return 0;
}
}
// 获取结果队列任务数
async getRQtasksALL() {
try {
const res = await redis.json.get(this.initInfoKey, { path: '$.RQtasksALL' });
return res ? res[0] : 0;
} catch (error) {
logger.error('获取结果队列任务数失败:', error);
return 0;
}
}
// 获取回调队列任务数
async getCQtasksALL() {
try {
const res = await redis.json.get(this.initInfoKey, { path: '$.CQtasksALL' });
return res ? res[0] : 0;
} catch (error) {
logger.error('获取回调队列任务数失败:', error);
return 0;
}
}
// 获取错误队列任务数
async getEQtaskALL() {
try {
const res = await redis.json.get(this.initInfoKey, { path: '$.EQtaskALL' });
return res ? res[0] : 0;
} catch (error) {
logger.error('获取错误队列任务数失败:', error);
return 0;
}
}
// 增加平台相关信息处理队列 正在处理的任务数
async addPlatformsProcess(taskCountMap) {
try {
const multi = redis.multi();
let PQcount = 0;
for (const [key, count] of taskCountMap.entries()) {
const [aigc, platform] = key.split(':');
multi.json.numIncrBy(this.initInfoKey, `$.platforms.${key}.PQtasks`, count);
if (platformData.polling.includes(platform)) {
PQcount++;
}
logger.debug(`增加相关平台处理队列: AIGC=${aigc}, Platform=${platform}, Count=${count}`);
}
multi.json.numIncrBy(this.initInfoKey, '$.PQtasksALL', PQcount);
await multi.exec(); // 等待multi命令执行完成
} catch (error) {
logger.error('增加平台处理队列任务数失败:', error);
}
}
// 减少单个平台处理队列任务数(带边界检查)
async reducePlatformsProcessSingle(platformKey) {
const key = `${this.prefix}:platforms:${platformKey}`;
try {
const platformInfo = await redis.json.get(this.initInfoKey, { path: `$.platforms.${platformKey}` });
if (!platformInfo || !platformInfo[0]) {
logger.warn(`[CapacityManager] 平台不存在: ${platformKey}`);
return 0;
}
const current = platformInfo[0].PQtasks;
let newValue = parseInt(current) - 1;
if (newValue < 0) {
logger.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`);
newValue = 0;
}
await redis.json.set(this.initInfoKey, `$.platforms.${platformKey}.PQtasks`, newValue);
logger.debug(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`);
return newValue;
} catch (error) {
logger.error(`[CapacityManager] 更新 PQtasks 失败:`, error);
throw error;
}
}
// 减少平台相关信息处理队列 正在处理的任务数
async reducePlatformsProcess(taskCountMap) {
try {
const multi = redis.multi();
let PQcount = 0;
for (const [key, count] of taskCountMap.entries()) {
const [aigc, platform] = key.split(':');
const platformKey = key;
const platformInfo = await redis.json.get(this.initInfoKey, { path: `$.platforms.${platformKey}` });
if (platformInfo && platformInfo[0]) {
const current = platformInfo[0].PQtasks;
let newValue = parseInt(current) - count;
if (newValue < 0) {
logger.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`);
newValue = 0;
}
multi.json.set(this.initInfoKey, `$.platforms.${platformKey}.PQtasks`, newValue);
logger.debug(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`);
}
if (platformData.polling.includes(platform)) {
PQcount++;
}
logger.debug(`减少相关平台处理队列: AIGC=${aigc}, Platform=${platform}, Count=${count}`);
}
multi.json.numIncrBy(this.initInfoKey, '$.PQtasksALL', -PQcount);
multi.json.numIncrBy(this.initInfoKey, '$.RQtasksALL', PQcount);
await multi.exec();
} catch (error) {
logger.error('减少平台处理队列任务数失败:', error);
}
}
// 增加平台等待队列的 等待中任务数
async addPlatformsWait(aigc, platform, count) {
try {
await redis.json.numIncrBy(this.initInfoKey, `$.platforms.${aigc}:${platform}.WQtasks`, count);
logger.debug(`增加平台等待队列任务数: AIGC=${aigc}, Platform=${platform}, Count=${count}`);
} catch (error) {
logger.error('增加平台等待队列任务数失败:', error);
}
}
// 减少平台等待队列的 等待中任务数
async reducePlatformsWait(taskCountMap) {
try {
const multi = redis.multi();
for (const [key, count] of taskCountMap.entries()) {
multi.json.numIncrBy(this.initInfoKey, `$.platforms.${key}.WQtasks`, -count);
}
await multi.exec(); // 等待multi命令执行完成
} catch (error) {
logger.error('减少平台等待队列任务数失败:', error);
}
}
// 增加回调队列的 完成任务数
async addCallbackRQtasks(count) {
try {
await redis.json.numIncrBy(this.initInfoKey, '$.CQtasksALL', count);
logger.debug(`增加回调队列任务数: ${count}`);
} catch (error) {
logger.error('增加回调队列任务数失败:', error);
}
}
// 减少回调队列任务数
async reduceCQtasksALL(count) {
try {
await redis.json.numIncrBy(this.initInfoKey, '$.CQtasksALL', -count);
logger.debug(`减少回调队列任务数: ${count}`);
} catch (error) {
logger.error('减少回调队列任务数失败:', error);
}
}
// 增加错误队列的 完成任务数
async addEQtaskALL(count) {
try {
await redis.json.numIncrBy(this.initInfoKey, '$.EQtaskALL', count);
logger.debug(`增加错误队列任务数: ${count}`);
} catch (error) {
logger.error('增加错误队列任务数失败:', error);
}
}
// 减少错误队列任务数
async reduceEQtaskALL(count) {
try {
await redis.json.numIncrBy(this.initInfoKey, '$.EQtaskALL', -count);
logger.debug(`减少错误队列任务数: ${count}`);
} catch (error) {
logger.error('减少错误队列任务数失败:', error);
}
}
}
export default new InitQueues();