122 lines
3.7 KiB
JavaScript
122 lines
3.7 KiB
JavaScript
import { parentPort } from 'worker_threads'
|
||
import redis from '../../redis/index.js'
|
||
import initQueue from '../../redis/initQueue.js';
|
||
|
||
// 日志工具函数
|
||
const logger = {
|
||
info: (message) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.log(`[${timestamp}] INFO: ${message}`);
|
||
},
|
||
error: (message, error) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
|
||
},
|
||
debug: (message) => {
|
||
const timestamp = new Date().toISOString();
|
||
console.debug(`[${timestamp}] DEBUG: ${message}`);
|
||
}
|
||
};
|
||
|
||
// 批量获取处理任务的信息
|
||
async function getTasks() {
|
||
const tasks = [];
|
||
const processedTaskIds = [];
|
||
const taskCountMap = new Map()
|
||
try {
|
||
// 1. 首先获取所有任务ID
|
||
const taskIDs = await redis.lRange(initQueue.resultList, 0, -1);
|
||
|
||
if (taskIDs.length === 0) {
|
||
return tasks;
|
||
}
|
||
|
||
// 2. 批量获取完整任务信息
|
||
const taskInfoPromises = [];
|
||
for (const taskID of taskIDs) {
|
||
taskInfoPromises.push(redis.hGetAll(`${initQueue.prefix}:task:${taskID}`));
|
||
}
|
||
|
||
const taskInfos = await Promise.all(taskInfoPromises);
|
||
|
||
// 3. 处理结果
|
||
for (let i = 0; i < taskIDs.length; i++) {
|
||
const taskId = taskIDs[i];
|
||
const taskInfo = taskInfos[i];
|
||
|
||
if (taskInfo && taskInfo.taskId && taskInfo.resultData) {
|
||
try {
|
||
// 解析JSON格式的字段
|
||
const task = {
|
||
taskId: taskInfo.taskId,
|
||
status: taskInfo.status,
|
||
resultData: taskInfo.resultData,
|
||
token: taskInfo.token
|
||
};
|
||
|
||
// 直接打包结果和任务ID,发送给主线程
|
||
const resultWithTaskId = {
|
||
taskId: task.taskId,
|
||
result: task.resultData
|
||
};
|
||
|
||
parentPort.postMessage({
|
||
type: 'success',
|
||
backendId: taskInfo.backendId,
|
||
message: JSON.stringify(resultWithTaskId)
|
||
});
|
||
|
||
tasks.push(task);
|
||
processedTaskIds.push(taskId);
|
||
} catch (parseError) {
|
||
logger.error(`解析任务结果失败: ${taskInfo.resultData}`, parseError);
|
||
}
|
||
}
|
||
}
|
||
|
||
// 4. 只删除已成功处理的任务结果
|
||
if (processedTaskIds.length > 0) {
|
||
const deleteMulti = redis.multi();
|
||
for (const taskId of processedTaskIds) {
|
||
deleteMulti.lRem(initQueue.resultList, 1, taskId);
|
||
}
|
||
await deleteMulti.exec();
|
||
|
||
// 5. 更新结果队列计数
|
||
// 从结果队列中减少任务数
|
||
try {
|
||
await redis.json.numIncrBy(initQueue.initInfoKey || 'default:InitInfo', '$.RQtasksALL', -processedTaskIds.length);
|
||
logger.debug(`减少结果队列任务数: ${processedTaskIds.length}`);
|
||
} catch (error) {
|
||
logger.error('减少结果队列任务数失败:', error);
|
||
}
|
||
}
|
||
|
||
} catch (error) {
|
||
logger.error('获取任务结果失败:', error);
|
||
}
|
||
|
||
return tasks;
|
||
}
|
||
|
||
// 持续执行批量处理
|
||
(async () => {
|
||
while (true) {
|
||
try {
|
||
// 直接检查结果列表长度,而不是依赖计数
|
||
const resultListLength = await redis.lLen(initQueue.resultList);
|
||
if (resultListLength > 0) {
|
||
logger.info('结果队列有可处理任务,数量: ' + resultListLength);
|
||
await getTasks(); // 处理结果队列的任务,发送结果后结束
|
||
} else {
|
||
// 结果队列为空,等待15秒后重试
|
||
await new Promise(resolve => setTimeout(resolve, 15000));
|
||
}
|
||
} catch (error) {
|
||
logger.error('处理结果任务时出错:', error);
|
||
// 出错后等待5秒再重试
|
||
await new Promise(resolve => setTimeout(resolve, 5000));
|
||
}
|
||
}
|
||
})()
|