import { parentPort } from 'worker_threads' import initQueue from '../../redis/initQueue.js' import redis from '../../redis/index.js' import dotenv from 'dotenv' // 配置 dotenv 加载环境变量 dotenv.config() // 日志工具函数 const logger = { info: (message) => { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] INFO: ${message}`); }, error: (message, error) => { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR: ${message}`, error || ''); }, debug: (message) => { const timestamp = new Date().toISOString(); console.debug(`[${timestamp}] DEBUG: ${message}`); } }; console.log('***********************************预处理线程启动成功**********************************************') let id // 参数检查函数 function validateTaskParams(task) { // 定义必填参数列表 const requiredParams = [ 'taskId', 'platform', 'payload' ]; // 检查task是否存在 if (!task) { return { valid: false, message: '缺少必填参数: task' }; } // 检查每个必填参数 for (const param of requiredParams) { if (param === 'taskType') { // taskType可以是0,所以需要特殊处理 if (task[param] === undefined || task[param] === null) { return { valid: false, message: `缺少必填参数: task.${param}` }; } } else { if (!task[param]) { return { valid: false, message: `缺少必填参数: task.${param}` }; } } } return { valid: true, message: '参数检查通过' }; } // 处理任务信息 function handleTask(data,backendId) { // console.log('data:', data, backendId); const task = { backendId: backendId, AIGC: process.env.PROJECT_PREFIX, // AIGC名称 { digitalHuman(数字人) } platform: data.platform, taskId: data.taskId, payload: data.payload, // 任务参数 workflowId: data.workflowId? data.workflowId : '', // 工作流ID status: 'pending', resultData: null } return task } // 提交任务 async function storeTask(task) { const waitName = initQueue.toQueue(task.AIGC, task.platform, 'wait') // 判断任务所属队列 // 将任务存储到 Hash 中,便于通过 任务ID(taskId) 查询,使用项目前缀 const taskKey = `${initQueue.prefix}:task:${task.taskId}`; const multi = redis.multi() // 1. 将任务存储到 Hash 中,使用键值对形式 multi.hSet(taskKey, 'taskId', task.taskId); // console.log('taskKey:', taskKey); multi.hSet(taskKey, 'payload', JSON.stringify(task.payload)); multi.hSet(taskKey, 'backendId', task.backendId); multi.hSet(taskKey, 'AIGC', task.AIGC); multi.hSet(taskKey, 'platform', task.platform); multi.hSet(taskKey, 'status', task.status); // 存储workflowId multi.hSet(taskKey, 'workflowId', task.workflowId || ''); // 2. 将任务ID添加到处理队列中(List结构) multi.rPush(waitName, task.taskId); // 3. 设置任务的过期时间为2小时(7200秒) multi.expire(taskKey, 7200); await multi.exec(); // 增加平台相关信息等待队列的任务数 initQueue.addPlatformsWait(task.AIGC, task.platform, 1) logger.info(`任务已加入排队队列:${task.taskId},并设置了2小时过期时间`); } // 处理任务数据 async function pre_deducted_fee (task, backendId) { // console.log('task:', task); // 调用参数检查函数 const validationResult = validateTaskParams(task); if (!validationResult.valid) { console.error('任务参数检查失败:', validationResult.message); parentPort.postMessage({ type: 'error', id: id, backendId: backendId, data: validationResult.message }); return; } const taskinfo = handleTask(task, backendId); await storeTask(taskinfo); // 使用新的存储方法 } // 启动处理循环 function startProcessingLoop() { setInterval(async () => { const queues = await redis.lPop(`${initQueue.prefix}:assessment:${id}`, 100) if (queues) { // console.log('队列非空,开始提交任务...'); let tasks = []; // 处理返回值:可能是字符串或数组 if (Array.isArray(queues)) { // 如果是数组,直接使用 tasks = queues; } else if (typeof queues === 'string') { // 如果是字符串,放入数组中 tasks = [queues]; } const promises = tasks.map(async (task) => { let taskObj; try { taskObj = JSON.parse(task); // 检查taskObj.data是否存在 // console.log('taskObj:', taskObj); if (!taskObj || !taskObj.data) { throw new Error('无效的任务数据格式'); } await pre_deducted_fee(taskObj.data, taskObj.backendId); } catch (error) { console.error('单个任务处理失败:', error); // 继续处理其他任务 parentPort.postMessage({ type: 'error', id: id, backendId: taskObj.backendId, data: '任务处理失败,请稍后再试。' }); } }); await Promise.all(promises); } else { await new Promise(resolve => setTimeout(resolve, 10000)); } }, 500); } parentPort.on('message', async (message) => { id = message.id parentPort.postMessage({ type: 'ok', id: id }); // 启动处理循环 startProcessingLoop(); })