148 lines
4.9 KiB
JavaScript
148 lines
4.9 KiB
JavaScript
import { parentPort } from 'worker_threads'
|
||
import redis from '../../redis/index.js'
|
||
import initQueue from '../../redis/initQueue.js'
|
||
|
||
// 日志工具函数
|
||
const logger = {
|
||
info: (message) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.log(`[${timestamp}] INFO: ${message}`);
|
||
},
|
||
error: (message, error) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
|
||
},
|
||
debug: (message) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.debug(`[${timestamp}] DEBUG: ${message}`);
|
||
}
|
||
};
|
||
|
||
// 批量获取处理任务的信息
|
||
async function getTasks() {
|
||
try {
|
||
// 从回调结果队列列表获取最多50个数据
|
||
const taskIds = await redis.lRange(initQueue.callback, 0, 49);
|
||
const taskCountMap = new Map()
|
||
|
||
if (taskIds.length === 0) {
|
||
// 如果没有任务ID,强制将回调队列任务数设置为0
|
||
redis.json.set(initQueue.initInfoKey, '$.CQtasksALL', 0);
|
||
logger.debug('回调结果队列为空,已重置任务数为0');
|
||
return [];
|
||
}
|
||
|
||
logger.debug('回调结果队列任务ID:', taskIds);
|
||
|
||
// 批量获取任务的backendId和resultData字段
|
||
const multi = redis.multi();
|
||
for (const taskId of taskIds) {
|
||
// 只获取需要的字段
|
||
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'backendId');
|
||
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'resultData');
|
||
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'AIGC');
|
||
multi.hGet(`${initQueue.prefix}:task:${taskId}`, 'platform');
|
||
}
|
||
|
||
const results = await multi.exec();
|
||
|
||
// 发送结果给客户端
|
||
const processedTaskIds = [];
|
||
for (let i = 0; i < taskIds.length; i++) {
|
||
// 从结果数组中提取对应字段(每4个结果对应一个任务)
|
||
const backendId = results[i * 4] || '';
|
||
const resultData = results[i * 4 + 1] || '';
|
||
const aigc = results[i * 4 + 2] || 'default';
|
||
const platform = results[i * 4 + 3] || 'default';
|
||
const taskId = taskIds[i];
|
||
|
||
if (backendId) {
|
||
try {
|
||
// 直接打包结果和任务ID,发送给主线程
|
||
const resultWithTaskId = {
|
||
taskId: taskId,
|
||
result: resultData
|
||
};
|
||
|
||
parentPort.postMessage({
|
||
type: 'success',
|
||
backendId: backendId,
|
||
message: JSON.stringify(resultWithTaskId)
|
||
});
|
||
|
||
logger.debug(`成功发送结果给客户端,taskId: ${taskId}`);
|
||
|
||
// 统计需要减少的平台任务数
|
||
const key = `${aigc}:${platform}`;
|
||
console.log('key:', key);
|
||
if(taskCountMap.has(key)){
|
||
taskCountMap.set(key, taskCountMap.get(key) + 1);
|
||
} else {
|
||
taskCountMap.set(key, 1);
|
||
}
|
||
processedTaskIds.push(taskId);
|
||
} catch (parseError) {
|
||
logger.error(`发送结果给客户端失败: ${taskId}`, parseError);
|
||
}
|
||
}
|
||
}
|
||
|
||
// 只有在成功发送结果后才执行后续操作
|
||
if (processedTaskIds.length > 0) {
|
||
// 使用原子操作执行多项任务
|
||
const multi = redis.multi();
|
||
|
||
// 1. 移除已获取的任务ID
|
||
multi.lTrim(initQueue.callback, processedTaskIds.length, -1);
|
||
|
||
// 2. 执行所有Redis操作
|
||
await multi.exec();
|
||
|
||
// 3. 更新平台任务数(如果有需要更新的任务)
|
||
if (taskCountMap.size > 0) {
|
||
await initQueue.reducePlatformsProcess(taskCountMap);
|
||
}
|
||
|
||
// 4. 更新回调队列任务数
|
||
await initQueue.reduceCQtasksALL(processedTaskIds.length);
|
||
|
||
logger.debug(`已处理${processedTaskIds.length}个回调结果任务,发送结果后结束`);
|
||
}
|
||
|
||
return taskIds;
|
||
} catch (error) {
|
||
logger.error('处理回调结果任务失败:', error);
|
||
// 出错时,强制将回调队列任务数设置为0,避免死循环
|
||
await redis.json.set(initQueue.initInfoKey, '$.CQtasksALL', 0);
|
||
return [];
|
||
}
|
||
}
|
||
|
||
// 持续执行批量处理
|
||
(async () => {
|
||
while (true) {
|
||
try {
|
||
// 判断结果队列是否有可发送的任务
|
||
const rqTasksAll = await initQueue.getCQtasksALL();
|
||
|
||
|
||
if (rqTasksAll !== 0) {
|
||
logger.info('回调结果队列有任务可处理,数量:', rqTasksAll);
|
||
// logger.debug('回调结果队列任务数量:', rqTasksAll);
|
||
// 先处理任务,再减少队列任务数
|
||
await getTasks(); // 处理结果队列的任务
|
||
|
||
// 添加延迟,避免高频率执行
|
||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||
} else {
|
||
// 回调结果队列无任务可处理,等待10秒后重试
|
||
await new Promise(resolve => setTimeout(resolve, 10000));
|
||
}
|
||
|
||
} catch (error) {
|
||
logger.error('持续处理回调结果任务失败:', error);
|
||
// 出错后等待5秒再重试
|
||
await new Promise(resolve => setTimeout(resolve, 5000));
|
||
}
|
||
}
|
||
})() |