shuzhiren-comfyui/任务队列后端/worker_threads/wait/waiting.js

254 lines
8.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { parentPort, Worker } from 'worker_threads';
import redis from '../../redis/index.js';
import initQueue from '../../redis/initQueue.js';
// 日志工具函数
const logger = {
info: (message) => {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] INFO: ${message}`);
},
error: (message, error) => {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
},
debug: (message) => {
const timestamp = new Date().toISOString();
console.debug(`[${timestamp}] DEBUG: ${message}`);
}
};
// 创建专门的线程池管理 Worker
const generateWorker = new Worker(new URL('./GenerateWorkerManager.js', import.meta.url));
// 判断并发数,获取可进行任务处理的等待队列
async function judgConcurrency() {
try {
// 获取平台相关信息包括等待队列名与并发数,当前任务数等
const platforms = await initQueue.getPlatforms();
// 储存可进行任务处理的等待队列
const wDeficiency = [];
logger.debug('获取到的平台信息:', platforms);
// 检查每个平台的实际队列长度
for(const [aigcPfName, info] of Object.entries(platforms)) {
try {
// 直接检查 Redis 队列的实际长度
const actualQueueLength = await redis.lLen(info.waitQueue);
logger.debug(`平台 ${aigcPfName} 信息PQtasks=${info.PQtasks}, MAX_CONCURRENT=${info.MAX_CONCURRENT}, 实际队列长度=${actualQueueLength}`);
// 判断是否可以处理任务:并发数未满且队列中有任务
if (info.PQtasks < info.MAX_CONCURRENT && actualQueueLength > 0) {
let count = info.MAX_CONCURRENT - info.PQtasks;
// 可处理的任务数不能大于队列实际长度
if(count > actualQueueLength) {
count = actualQueueLength;
}
wDeficiency.push({ aigcPfName, info, count }); // 储存可进行任务处理的等待队列
logger.debug(`平台 ${aigcPfName} 满足处理条件,可处理 ${count} 个任务`);
} else {
logger.debug(`平台 ${aigcPfName} 不满足处理条件PQtasks < MAX_CONCURRENT = ${info.PQtasks < info.MAX_CONCURRENT}, 队列长度 > 0 = ${actualQueueLength > 0}`);
}
} catch (error) {
logger.error(`检查平台 ${aigcPfName} 队列长度失败:`, error);
}
}
return wDeficiency; // 返回可处理的队列列表
} catch (error) {
logger.error('判断并发数失败:', error);
return [];
}
}
// 批量获取等待队列任务的任务ID仅获取不移除
async function getBatchWaitTasksID(platforms) {
try {
const multi = redis.multi();
// 从等待队列批量获取任务ID但不立即移除
for(const platform of platforms) {
multi.lRange(platform.info.waitQueue, 0, platform.count - 1);
}
// 执行所有命令
const results = await multi.exec();
console.log('批量获取等待队列任务ID结果:', results);
// 将任务ID与处理队列关联
for(let i = 0; i < results.length; i++) {
const taskIDs = results[i] || [];
const platform = platforms[i];
platform.waitTaskID = taskIDs;
}
logger.debug('批量获取等待队列任务ID', platforms);
return platforms;
} catch (error) {
logger.error('批量获取等待队列任务ID失败:', error);
return platforms;
}
}
// 批量获取多个等待队列中的任务数据
async function getBatchWaitTasks(aigcPfTasks) {
const tasksData = [];
try {
// 收集所有需要获取的任务ID
const allTaskIds = [];
const taskIdMap = new Map(); // 用于映射任务ID到平台信息
for(const aigcPfTask of aigcPfTasks) {
for(const taskId of aigcPfTask.waitTaskID) {
if (taskId) {
allTaskIds.push(taskId);
taskIdMap.set(taskId, {
platformName: aigcPfTask.info.platformName,
aigc: aigcPfTask.info.AIGC,
aigcPfName: aigcPfTask.aigcPfName
});
}
}
}
if (allTaskIds.length === 0) {
return tasksData;
}
// 批量获取任务数据
const multi = redis.multi();
for(const taskId of allTaskIds) {
multi.hGetAll(`${initQueue.prefix}:task:${taskId}`);
}
const results = await multi.exec();
// 处理结果
for(let i = 0; i < results.length; i++) {
const taskInfo = results[i];
const taskId = allTaskIds[i];
const platformInfo = taskIdMap.get(taskId);
if (taskInfo) {
try {
tasksData.push({
backendId: taskInfo.backendId,
taskId: taskInfo.taskId, // 单个任务ID
platformName: platformInfo.platformName,
aigc: platformInfo.aigc,
aigcPfName: platformInfo.aigcPfName,
taskData: taskInfo.payload,
workflowId: taskInfo.workflowId || '',
});
// logger.debug(`已获取任务 ${taskId} 数据platform=${platformInfo.platformName}, aigc=${platformInfo.aigc}`);
} catch (error) {
logger.error(`解析任务${taskId}数据失败:`, error);
}
} else {
logger.warn(`任务 ${taskId} 数据不存在`);
}
}
// logger.debug('批量获取多个等待队列中的任务数据:', tasksData);
return tasksData;
} catch (error) {
logger.error('批量获取任务数据失败:', error);
return tasksData;
}
}
// 批量移除任务ID、更新等待队列计数并增加处理队列任务数
async function updateTaskCounts(wDeficiency) {
try {
const taskCountMap = new Map();
const multi = redis.multi();
// 1. 准备批量移除任务ID和更新计数
for(const aigcPfTask of wDeficiency) {
const { waitTaskID, info } = aigcPfTask;
const key = aigcPfTask.aigcPfName;
const count = waitTaskID.length;
if (count > 0) {
// 移除已处理的任务ID
multi.lTrim(info.waitQueue, count, -1);
// 更新计数映射
if(taskCountMap.has(key)) {
taskCountMap.set(key, taskCountMap.get(key) + count);
} else {
taskCountMap.set(key, count);
}
}
}
// 2. 执行Redis命令移除任务ID
await multi.exec();
// 3. 更新平台计数
if (taskCountMap.size > 0) {
// 减少平台等待队列的待处理任务数
await initQueue.reducePlatformsWait(taskCountMap);
// 增加平台处理队列的正在处理任务数
await initQueue.addPlatformsProcess(taskCountMap);
logger.debug('更新任务计数完成');
}
} catch (error) {
logger.error('更新任务计数失败:', error);
}
}
// 持续执行批量处理
(async () => {
while(true) {
try {
// 判断并发数,获取可进行任务处理的等待队列
const wDeficiency = await judgConcurrency();
// 判断是否有可处理的队列
if(wDeficiency.length > 0) {
// logger.debug('可进行任务处理的等待队列:', wDeficiency);
logger.info('有可进行处理的队列,数量: ' + wDeficiency.length);
console.log(wDeficiency);
// 批量获取多个等待队列中的任务ID
const tasksWithIds = await getBatchWaitTasksID(wDeficiency);
// 通过任务ID批量获取多个等待队列中的任务数据
const tasksData = await getBatchWaitTasks(tasksWithIds);
// 更新任务计数 - 无论是否获取到任务数据都需要更新计数因为任务ID已经从Redis队列中移除
await updateTaskCounts(tasksWithIds);
// 将任务发送给生成 Worker 处理
if (tasksData.length > 0) {
logger.info('发送任务给生成Worker处理数量: ' + tasksData.length);
generateWorker.postMessage(tasksData);
}
} else {
// 没有可处理的队列等待10秒后重试
await new Promise(resolve => setTimeout(resolve, 10000));
logger.debug('没有可处理的队列');
}
} catch (error) {
logger.error('批量处理任务失败:', error);
// 出错后等待5秒再重试
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
})();
// 监听 generateWorker 的消息
generateWorker.on('message', (message) => {
if (message.status === 'completed') {
logger.debug('等待任务处理完成');
}
});
generateWorker.on('error', (error) => {
logger.error('生成 Worker 错误:', error);
// 可以考虑重启Worker
});