import { parentPort } from 'worker_threads'; import redis from '../../redis/index.js' import initQueue from '../../redis/initQueue.js' // 日志工具函数 const logger = { info: (...args) => { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] INFO:`, ...args); }, error: (...args) => { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR:`, ...args); }, debug: (...args) => { const timestamp = new Date().toISOString(); console.debug(`[${timestamp}] DEBUG:`, ...args); }, warn: (...args) => { const timestamp = new Date().toISOString(); console.warn(`[${timestamp}] WARN:`, ...args); } }; // 批量获取错误任务的信息 async function getTasks() { try { const taskIds = await redis.lRange(initQueue.errorList, 0, -1); if (taskIds.length === 0) { const currentCount = await initQueue.getEQtaskALL(); if (currentCount > 0) { await initQueue.reduceEQtaskALL(currentCount); logger.info(`错误队列计数器修正: ${currentCount} -> 0`); } return true; } logger.debug('错误队列任务ID:', taskIds); const multi = redis.multi(); for (const taskId of taskIds) { multi.hGetAll(`${initQueue.prefix}:task:${taskId}`); } const results = await multi.exec(); let processedCount = 0; const taskCountMap = new Map(); const validTaskIds = []; for (let i = 0; i < taskIds.length; i++) { const taskId = taskIds[i]; const taskInfo = results[i]; if (taskInfo && taskInfo.taskId && taskInfo.resultData) { try { logger.debug('错误队列任务数据:', taskInfo); const resultWithTaskId = { taskId: taskInfo.taskId, result: taskInfo.resultData }; parentPort.postMessage({ type: 'error', backendId: taskInfo.backendId, message: JSON.stringify(resultWithTaskId) }); processedCount++; validTaskIds.push(taskId); const key = `${taskInfo.AIGC}:${taskInfo.platform}`; if(taskCountMap.has(key)){ taskCountMap.set(key, taskCountMap.get(key) + 1); } else { taskCountMap.set(key, 1); } } catch (parseError) { logger.error(`解析错误任务数据失败: ${taskInfo.resultData}`, parseError); validTaskIds.push(taskId); } } else { validTaskIds.push(taskId); logger.warn(`错误任务信息不存在或无效,将清理: taskId=${taskId}`); } } if (validTaskIds.length > 0) { const deleteMulti = redis.multi(); for (const taskId of validTaskIds) { deleteMulti.lRem(initQueue.errorList, 1, taskId); deleteMulti.del(`${initQueue.prefix}:task:${taskId}`); } await deleteMulti.exec(); await initQueue.reduceEQtaskALL(validTaskIds.length); logger.info(`处理了 ${processedCount} 个错误任务,清理了 ${validTaskIds.length} 个任务记录`); if (taskCountMap.size > 0) { await initQueue.reducePlatformsWait(taskCountMap); } } return true; } catch (error) { logger.error('获取错误任务失败:', error); return false; } } // 持续执行批量处理 (async () => { while (true) { try { const errorTasksCount = await initQueue.getEQtaskALL(); // 判断是否有可处理的错误任务 if (errorTasksCount > 0) { logger.info('错误队列有可处理任务,数量:', errorTasksCount); await getTasks(); } else { // 没有可处理的错误任务,等待15秒后重试 await new Promise(resolve => setTimeout(resolve, 15000)); // logger.debug('错误队列无任务可处理'); } } catch (error) { logger.error('持续处理错误任务失败:', error); // 出错后等待5秒再重试 await new Promise(resolve => setTimeout(resolve, 5000)); } } })()