import dotenv from 'dotenv' import WebSocket, { WebSocketServer } from 'ws' import { Worker } from 'worker_threads' import { checkUsertoken } from './school/api.js' import redis from './redis/index.js' import initQueue from './redis/initQueue.js' import messagePersistence from './redis/messagePersistence.js' import code from './config/code.json' with { type: 'json' } // 配置 dotenv 加载环境变量 dotenv.config() // 日志工具函数 const logger = { info: (message) => { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] INFO: ${message}`); }, error: (message, error) => { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR: ${message}`, error || ''); }, debug: (message) => { const timestamp = new Date().toISOString(); console.debug(`[${timestamp}] DEBUG: ${message}`); } }; let wss = null; const workers = []; // 初始化函数 async function initialize() { logger.info('***************初始化队列开始***************'); try { // 确保 Redis 连接后再初始化队列 if (!redis.isOpen) { await redis.connect(); logger.info('Redis 连接成功'); } await initQueue.init(); logger.info('***************初始化队列完成***************'); // 创建 WebSocket 服务器 createWebSocketServer(); // 启动定期清理过期消息的任务(每天执行一次) setInterval(async () => { try { await messagePersistence.cleanupOldMessages(2 * 24 * 60 * 60 * 1000); // 清理7天前的消息 } catch (error) { logger.error('定期清理过期消息失败:', error); } }, 24 * 60 * 60 * 1000); // 每24小时执行一次 } catch (err) { logger.error('初始化失败:', err); process.exit(1); // 初始化失败退出进程,让进程管理器重启 } } const socketMap = new Map(); // 创建并管理worker线程 function createWorker(scriptPath) { const worker = new Worker(scriptPath); worker.setMaxListeners(20); worker.on('error', (error) => { logger.error(`Worker ${scriptPath} 错误:`, error); }); worker.on('exit', (code) => { if (code !== 0) { logger.error(`Worker ${scriptPath} 异常退出,退出码: ${code}`); // 可以考虑重启worker } }); workers.push(worker); return worker; } // 初始化所有worker线程 const assessment = createWorker('./worker_threads/assessment/assessment.js'); const wait = createWorker('./worker_threads/wait/waiting.js'); const polling = createWorker('./worker_threads/process/process.js'); const result = createWorker('./worker_threads/result/result.js'); const callback_result = createWorker('./worker_threads/callback_result/result.js'); const error = createWorker('./worker_threads/error/error.js'); // 发送消息给客户端的工具函数 async function sendMessageToClient(id, message, close = false, closeCode = 1000, closeReason = '') { let socket; // 尝试通过id查找socket,id可能是taskId或backendId if (typeof id === 'string' && id) { socket = socketMap.get(id); } if (socket && socket.readyState === WebSocket.OPEN && message) { try { socket.send(message); const messagePreview = typeof message === 'string' ? message.slice(0, 50) : JSON.stringify(message).slice(0, 50); logger.debug(`成功发送消息到客户端,id: ${id}, 消息: ${messagePreview}...`); if (close) { socket.close(closeCode, closeReason); } return true; } catch (error) { logger.error(`发送消息给客户端失败,id: ${id}`, error); return false; } } else { if (!message) { logger.debug(`消息为空,无法发送,id: ${id}`); return false; } else { logger.debug(`未找到目标客户端或连接已关闭,保存消息到待发送队列,id: ${id}`); try { await messagePersistence.savePendingMessage(id, message); logger.info(`消息已保存到待发送队列,等待重试: backendId=${id}`); return false; } catch (error) { logger.error(`保存待发送消息失败: backendId=${id}`, error); return false; } } } } // 创建 WebSocket 服务器函数 function createWebSocketServer() { wss = new WebSocketServer({ port: process.env.WS_PORT || 8087, verifyClient: async (info, callback) => { try { const urlParams = new URLSearchParams(info.req.url.split('?')[1]); const token = urlParams.get('token'); const id = urlParams.get('id'); if (!token) { logger.info('缺少令牌'); callback(false, 401, '缺少令牌'); return; } else if (token !== process.env.TOKEN_SECRET){ logger.info('验证后端失败'); callback(false, 401, 'Token is invalid'); return; } info.req.id = id; logger.info(`用户ID: token 验证成功`); callback(true); } catch (error) { logger.error('验证后端失败:', error); callback(false, 401, 'Token is invalid'); } } }); // 日志显示WebSocket服务器端口 logger.info(`WebSocket server is running on port: ${process.env.WS_PORT || 8087}`); // 添加服务器错误处理 wss.on('error', (error) => { logger.error('WebSocket服务器错误:', error); }); // 当有客户端连接时触发 wss.on('connection', async (socket, req) => { const id = req.id; logger.info(`${id}号后端 连接成功`); socketMap.set(id, socket); // 连接成功后,只发送一条请求taskId的消息 socket.send('please give me tasks'); // 重试发送之前未发送的消息 try { const pendingMessages = await messagePersistence.getPendingMessages(id); if (pendingMessages.length > 0) { logger.info(`${id}号后端 发现 ${pendingMessages.length} 条待发送消息,开始重试发送`); for (const pendingMsg of pendingMessages) { try { socket.send(pendingMsg.message); await messagePersistence.removePendingMessage(pendingMsg.key); logger.debug(`成功重试发送消息: backendId=${id}, messageKey=${pendingMsg.key}`); } catch (error) { logger.error(`重试发送消息失败: backendId=${id}, messageKey=${pendingMsg.key}`, error); await messagePersistence.incrementRetryCount(pendingMsg.key); } } logger.info(`${id}号后端 待发送消息重试完成`); } } catch (error) { logger.error(`获取或发送待发送消息失败: backendId=${id}`, error); } // 处理收到的消息 socket.on('message', (message) => { const messageStr = typeof message === 'string' ? message : message.toString(); // 首先检查是否为心跳消息 if (messageStr === 'ping') { socket.send('pong'); // 回复心跳 return; } try { // 只检查前面100个字符是否包含 `"type": "generate"`,提高大消息处理性能 const prefix = messageStr.slice(0, 50); if (prefix.includes('"type":"generate"') || prefix.includes("'type':'generate'")) { // 在此处添加处理消息的逻辑 assessment.postMessage({ type: 'submit', data: messageStr }); } else { // 记录日志,不关闭连接 logger.debug(`收到未知消息类型: ${prefix}`); } } catch (e) { logger.error('处理消息出错:', e); // 发送错误消息,不关闭连接 socket.send(JSON.stringify({ error: '处理消息出错', details: e.message })); } }); // 定期发送心跳 const heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send('ping'); logger.debug(`向 ${id} 号后端发送心跳`); } }, 30000); // 每30秒发送一次心跳 // 处理连接关闭 socket.on('close', (code, reason) => { // 清理心跳定时器 clearInterval(heartbeatInterval); logger.info(`${id}号后端 连接关闭,关闭码: ${code},原因: ${reason}`); }); // 处理连接错误 socket.on('error', (error) => { logger.error(`${id}号后端 连接错误:`, error); // 不关闭连接,尝试继续通信 }); }); // 任务检验的工作线程响应处理 assessment.on('message', async (message) => { logger.debug(`收到assessment worker消息: ${JSON.stringify(message)}`); if (message.type === 'AssessmentSuccess') { await sendMessageToClient(message.backendId, code.SUCCESS[message.type]); } else { await sendMessageToClient(message.backendId, code.ERROR[message.type], false, 4401, code.ERROR[message.type]); } }); // 获取结果线程响应处理 result.on('message', async (message) => { logger.debug(`收到result worker消息: ${JSON.stringify(message)}`); if (message.type === 'success') { await sendMessageToClient(message.backendId, message.message, false, 1000, 'success'); } else { await sendMessageToClient(message.backendId, '获取结果失败,可在历史记录区刷新查看结果', false, 4401, code.ERROR[message.type]); } }); // 获取回调结果线程响应处理 callback_result.on('message', async (message) => { logger.debug(`收到callback_result worker消息: ${JSON.stringify(message)}`); if (message.type === 'success') { await sendMessageToClient(message.backendId, message.message, false, 1000, 'success'); } else { await sendMessageToClient(message.backendId, '获取结果失败,可在历史记录区刷新查看结果', false, 4401); } }); error.on('message', async (message) => { logger.debug(`收到error worker消息: ${JSON.stringify(message)}`); await sendMessageToClient(message.backendId, message.message, false, 4402, 'false'); }); } // 优雅关闭机制 function gracefulShutdown() { logger.info('开始优雅关闭...'); // 关闭WebSocket服务器,拒绝新连接 if (wss) { wss.close(() => { logger.info('WebSocket服务器已关闭'); }); // 关闭所有现有连接 wss.clients.forEach((client) => { client.close(1001, '服务器正在关闭'); }); } // 终止所有worker线程 workers.forEach((worker, index) => { logger.info(`终止worker线程 ${index}`); worker.terminate(); }); // 关闭Redis连接 if (redis.isOpen) { redis.disconnect() .then(() => { logger.info('Redis连接已关闭'); process.exit(0); }) .catch((error) => { logger.error('关闭Redis连接失败:', error); process.exit(1); }); } else { process.exit(0); } } // 监听终止信号 process.on('SIGINT', gracefulShutdown); process.on('SIGTERM', gracefulShutdown); // 启动服务器 initialize();