shuzhiren-comfyui/任务队列后端/worker_threads/callback_result/result.js

148 lines
4.9 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { parentPort } from 'worker_threads'
import redis from '../../redis/index.js'
import initQueue from '../../redis/initQueue.js'
// 日志工具函数
const logger = {
info: (message) => {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] INFO: ${message}`);
},
error: (message, error) => {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
},
debug: (message) => {
const timestamp = new Date().toISOString();
console.debug(`[${timestamp}] DEBUG: ${message}`);
}
};
// 批量获取处理任务的信息
async function getTasks() {
try {
// 从回调结果队列列表获取最多50个数据
const taskIds = await redis.lRange(initQueue.callback, 0, 49);
const taskCountMap = new Map()
if (taskIds.length === 0) {
// 如果没有任务ID强制将回调队列任务数设置为0
redis.json.set(initQueue.initInfoKey, '$.CQtasksALL', 0);
logger.debug('回调结果队列为空已重置任务数为0');
return [];
}
logger.debug('回调结果队列任务ID:', taskIds);
// 批量获取任务的backendId和resultData字段
const multi = redis.multi();
for (const taskId of taskIds) {
// 只获取需要的字段
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'backendId');
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'resultData');
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'AIGC');
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'platform');
}
const results = await multi.exec();
// 发送结果给客户端
const processedTaskIds = [];
for (let i = 0; i < taskIds.length; i++) {
// 从结果数组中提取对应字段每4个结果对应一个任务
const backendId = results[i * 4] || '';
const resultData = results[i * 4 + 1] || '';
const aigc = results[i * 4 + 2] || 'default';
const platform = results[i * 4 + 3] || 'default';
const taskId = taskIds[i];
if (backendId) {
try {
// 直接打包结果和任务ID发送给主线程
const resultWithTaskId = {
taskId: taskId,
result: resultData
};
parentPort.postMessage({
type: 'success',
backendId: backendId,
message: JSON.stringify(resultWithTaskId)
});
logger.debug(`成功发送结果给客户端taskId: ${taskId}`);
// 统计需要减少的平台任务数
const key = `${aigc}:${platform}`;
console.log('key:', key);
if(taskCountMap.has(key)){
taskCountMap.set(key, taskCountMap.get(key) + 1);
} else {
taskCountMap.set(key, 1);
}
processedTaskIds.push(taskId);
} catch (parseError) {
logger.error(`发送结果给客户端失败: ${taskId}`, parseError);
}
}
}
// 只有在成功发送结果后才执行后续操作
if (processedTaskIds.length > 0) {
// 使用原子操作执行多项任务
const multi = redis.multi();
// 1. 移除已获取的任务ID
multi.lTrim(initQueue.callback, processedTaskIds.length, -1);
// 2. 执行所有Redis操作
await multi.exec();
// 3. 更新平台任务数(如果有需要更新的任务)
if (taskCountMap.size > 0) {
await initQueue.reducePlatformsProcess(taskCountMap);
}
// 4. 更新回调队列任务数
await initQueue.reduceCQtasksALL(processedTaskIds.length);
logger.debug(`已处理${processedTaskIds.length}个回调结果任务,发送结果后结束`);
}
return taskIds;
} catch (error) {
logger.error('处理回调结果任务失败:', error);
// 出错时强制将回调队列任务数设置为0避免死循环
await redis.json.set(initQueue.initInfoKey, '$.CQtasksALL', 0);
return [];
}
}
// 持续执行批量处理
(async () => {
while (true) {
try {
// 判断结果队列是否有可发送的任务
const rqTasksAll = await initQueue.getCQtasksALL();
if (rqTasksAll !== 0) {
logger.info('回调结果队列有任务可处理,数量:', rqTasksAll);
// logger.debug('回调结果队列任务数量:', rqTasksAll);
// 先处理任务,再减少队列任务数
await getTasks(); // 处理结果队列的任务
// 添加延迟,避免高频率执行
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
// 回调结果队列无任务可处理等待10秒后重试
await new Promise(resolve => setTimeout(resolve, 10000));
}
} catch (error) {
logger.error('持续处理回调结果任务失败:', error);
// 出错后等待5秒再重试
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
})()