import { createClient } from 'redis'; import dotenv from 'dotenv'; dotenv.config(); const redis = createClient({ RESP: 3, url: process.env.REDIS_URL || 'redis://localhost:16379', password: process.env.REDIS_PASSWORD || '654321', socket: { connectTimeout: 10000, keepAlive: 30000 }, legacyMode: false, enableReadyCheck: true, maxRetriesPerRequest: 3 }); redis.on('error', (err) => { console.error('Redis 连接错误:', err); }); const PREFIX = process.env.PROJECT_PREFIX || 'digitalHuman-v3'; const CALLBACK_TIMEOUT = parseInt(process.env.CALLBACK_TIMEOUT) || 3600000; const QUEUE_NAMES = { initInfo: `${PREFIX}:InitInfo`, processPolling: `${PREFIX}:process:Polling`, processCallback: `${PREFIX}:process:callback`, resultQueue: `${PREFIX}:result:queue`, resultList: `${PREFIX}:result:list`, callback: `${PREFIX}:callback`, errorQueue: `${PREFIX}:error:queue`, errorList: `${PREFIX}:error:list`, pendingMessages: `${PREFIX}:pending:messages`, callbackPending: `${PREFIX}:callback:pending` }; const AIGC_TYPES = ['digitalHuman-v3']; const PLATFORMS = ['comfyui', 'runninghub', 'coze']; const COLORS = { reset: '\x1b[0m', red: '\x1b[31m', green: '\x1b[32m', yellow: '\x1b[33m', blue: '\x1b[34m', cyan: '\x1b[36m' }; function log(color, ...args) { console.log(COLORS[color], ...args, COLORS.reset); } function getWaitQueueName(aigc, platform) { return `${aigc}:${platform}:wait`; } async function checkWaitQueues() { log('cyan', '\n========== 检测等待队列 =========='); const issues = []; for (const aigc of AIGC_TYPES) { for (const platform of PLATFORMS) { const queueName = getWaitQueueName(aigc, platform); try { const length = await redis.lLen(queueName); if (length > 0) { const issue = `等待队列 [${queueName}] 有 ${length} 个任务积压`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'wait_queue_backlog', queue: queueName, count: length }); } else { log('green', `✓ 等待队列 [${queueName}] 为空`); } } catch (error) { log('red', `✗ 检测等待队列 [${queueName}] 失败:`, error.message); } } } return issues; } async function checkProcessQueues() { log('cyan', '\n========== 检测处理队列 =========='); const issues = []; try { const pollingLength = await redis.lLen(QUEUE_NAMES.processPolling); if (pollingLength > 0) { const issue = `处理队列 [${QUEUE_NAMES.processPolling}] 有 ${pollingLength} 个任务`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'process_queue_has_tasks', queue: QUEUE_NAMES.processPolling, count: pollingLength }); } else { log('green', `✓ 处理队列 [${QUEUE_NAMES.processPolling}] 为空`); } } catch (error) { log('red', `✗ 检测处理队列失败:`, error.message); } try { const callbackLength = await redis.lLen(QUEUE_NAMES.processCallback); if (callbackLength > 0) { const issue = `回调处理队列 [${QUEUE_NAMES.processCallback}] 有 ${callbackLength} 个任务`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'callback_queue_has_tasks', queue: QUEUE_NAMES.processCallback, count: callbackLength }); } else { log('green', `✓ 回调处理队列 [${QUEUE_NAMES.processCallback}] 为空`); } } catch (error) { log('red', `✗ 检测回调处理队列失败:`, error.message); } return issues; } async function checkResultQueues() { log('cyan', '\n========== 检测结果队列 =========='); const issues = []; try { const queueLength = await redis.lLen(QUEUE_NAMES.resultQueue); if (queueLength > 0) { const issue = `结果队列 [${QUEUE_NAMES.resultQueue}] 有 ${queueLength} 个任务`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'result_queue_has_tasks', queue: QUEUE_NAMES.resultQueue, count: queueLength }); } else { log('green', `✓ 结果队列 [${QUEUE_NAMES.resultQueue}] 为空`); } } catch (error) { log('red', `✗ 检测结果队列失败:`, error.message); } try { const listLength = await redis.lLen(QUEUE_NAMES.resultList); if (listLength > 0) { const issue = `结果列表 [${QUEUE_NAMES.resultList}] 有 ${listLength} 个任务`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'result_list_has_tasks', queue: QUEUE_NAMES.resultList, count: listLength }); } else { log('green', `✓ 结果列表 [${QUEUE_NAMES.resultList}] 为空`); } } catch (error) { log('red', `✗ 检测结果列表失败:`, error.message); } return issues; } async function checkErrorQueues() { log('cyan', '\n========== 检测错误队列 =========='); const issues = []; try { const queueLength = await redis.lLen(QUEUE_NAMES.errorQueue); if (queueLength > 0) { const issue = `错误队列 [${QUEUE_NAMES.errorQueue}] 有 ${queueLength} 个任务`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'error_queue_has_tasks', queue: QUEUE_NAMES.errorQueue, count: queueLength }); } else { log('green', `✓ 错误队列 [${QUEUE_NAMES.errorQueue}] 为空`); } } catch (error) { log('red', `✗ 检测错误队列失败:`, error.message); } try { const listLength = await redis.lLen(QUEUE_NAMES.errorList); if (listLength > 0) { const issue = `错误列表 [${QUEUE_NAMES.errorList}] 有 ${listLength} 个任务`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'error_list_has_tasks', queue: QUEUE_NAMES.errorList, count: listLength }); } else { log('green', `✓ 错误列表 [${QUEUE_NAMES.errorList}] 为空`); } } catch (error) { log('red', `✗ 检测错误列表失败:`, error.message); } return issues; } async function checkCallbackPending() { log('cyan', '\n========== 检测回调等待任务 =========='); const issues = []; try { const tasks = await redis.hGetAll(QUEUE_NAMES.callbackPending); const taskEntries = Object.entries(tasks); const now = Date.now(); const timeoutTasks = []; if (taskEntries.length > 0) { log('blue', `发现 ${taskEntries.length} 个回调等待任务`); for (const [remoteTaskId, taskJson] of taskEntries) { try { const task = JSON.parse(taskJson); const age = now - task.createdAt; if (age > CALLBACK_TIMEOUT) { const ageMinutes = Math.floor(age / 60000); timeoutTasks.push({ remoteTaskId, taskId: task.taskId, aigc: task.aigc, platform: task.platform, ageMinutes, createdAt: new Date(task.createdAt).toLocaleString('zh-CN') }); } } catch (parseError) { log('red', `解析任务数据失败: ${remoteTaskId}`); } } if (timeoutTasks.length > 0) { log('red', `\n发现 ${timeoutTasks.length} 个超时的回调等待任务:`); for (const task of timeoutTasks) { log('red', ` - 任务ID: ${task.taskId}, 远程ID: ${task.remoteTaskId}, 平台: ${task.aigc}:${task.platform}, 已等待: ${task.ageMinutes}分钟, 创建时间: ${task.createdAt}`); } issues.push({ type: 'callback_pending_timeout', count: timeoutTasks.length, tasks: timeoutTasks }); } else { log('green', `✓ 所有回调等待任务都在正常时间内`); } } else { log('green', `✓ 没有回调等待任务`); } } catch (error) { log('red', `✗ 检测回调等待任务失败:`, error.message); } return issues; } async function checkPendingMessages() { log('cyan', '\n========== 检测待处理消息 =========='); const issues = []; try { const messageKeys = await redis.lRange(QUEUE_NAMES.pendingMessages, 0, -1); if (messageKeys.length > 0) { const issue = `待处理消息列表有 ${messageKeys.length} 个消息`; log('yellow', `⚠️ ${issue}`); const messageDetails = []; for (const key of messageKeys.slice(0, 10)) { try { const data = await redis.hGetAll(key); if (data && data.taskId) { messageDetails.push({ key, taskId: data.taskId, retryCount: data.retryCount || 0, timestamp: data.timestamp ? new Date(parseInt(data.timestamp)).toLocaleString('zh-CN') : 'unknown' }); } } catch (parseError) { } } if (messageDetails.length > 0) { log('yellow', '\n前10个待处理消息:'); for (const msg of messageDetails) { log('yellow', ` - 任务ID: ${msg.taskId}, 重试次数: ${msg.retryCount}, 时间: ${msg.timestamp}`); } } issues.push({ type: 'pending_messages_backlog', count: messageKeys.length, samples: messageDetails }); } else { log('green', `✓ 没有待处理消息`); } } catch (error) { log('red', `✗ 检测待处理消息失败:`, error.message); } return issues; } async function checkCounters() { log('cyan', '\n========== 检测计数器 =========='); const issues = []; try { const initInfo = await redis.json.get(QUEUE_NAMES.initInfo, { path: '$' }); if (!initInfo || !initInfo[0]) { log('yellow', `⚠️ 未找到初始化信息 [${QUEUE_NAMES.initInfo}]`); issues.push({ type: 'init_info_missing' }); return issues; } const info = initInfo[0]; log('blue', '\n全局计数器:'); log('blue', ` PQtasksALL (处理队列总任务数): ${info.PQtasksALL || 0}`); log('blue', ` RQtasksALL (结果队列总任务数): ${info.RQtasksALL || 0}`); log('blue', ` CQtasksALL (回调队列总任务数): ${info.CQtasksALL || 0}`); log('blue', ` EQtaskALL (错误队列总任务数): ${info.EQtaskALL || 0}`); if (info.platforms) { log('blue', '\n平台计数器:'); for (const [key, platform] of Object.entries(info.platforms)) { const wqCount = platform.WQtasks ?? 0; const pqCount = platform.PQtasks ?? 0; const maxConcurrency = platform.MAX_CONCURRENT ?? 0; log('blue', ` [${key}]`); log('blue', ` WQtasks (等待队列任务数): ${wqCount}`); log('blue', ` PQtasks (处理队列任务数): ${pqCount}`); log('blue', ` MAX_CONCURRENT (最大并发数): ${maxConcurrency}`); if (wqCount < 0) { const issue = `平台 [${key}] 等待队列计数器为负值: ${wqCount}`; log('red', `✗ ${issue}`); issues.push({ type: 'wait_counter_negative', platform: key, count: wqCount }); } else if (wqCount > 0) { const issue = `平台 [${key}] 等待队列计数器不为零: ${wqCount}`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'wait_counter_not_zero', platform: key, count: wqCount }); } if (pqCount < 0) { const issue = `平台 [${key}] 处理队列计数器为负值: ${pqCount}`; log('red', `✗ ${issue}`); issues.push({ type: 'process_counter_negative', platform: key, count: pqCount }); } else if (pqCount > maxConcurrency) { const issue = `平台 [${key}] 处理队列任务数 (${pqCount}) 超过最大并发数 (${maxConcurrency})`; log('red', `✗ ${issue}`); issues.push({ type: 'process_counter_exceeds_concurrency', platform: key, count: pqCount, max: maxConcurrency }); } } } if (info.PQtasksALL > 0) { const issue = `全局处理队列计数器不为零: ${info.PQtasksALL}`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'global_process_counter_not_zero', count: info.PQtasksALL }); } else if (info.PQtasksALL < 0) { const issue = `全局处理队列计数器为负值: ${info.PQtasksALL}`; log('red', `✗ ${issue}`); issues.push({ type: 'global_process_counter_negative', count: info.PQtasksALL }); } if (info.CQtasksALL > 0) { const issue = `全局回调队列计数器不为零: ${info.CQtasksALL}`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'global_callback_counter_not_zero', count: info.CQtasksALL }); } else if (info.CQtasksALL < 0) { const issue = `全局回调队列计数器为负值: ${info.CQtasksALL}`; log('red', `✗ ${issue}`); issues.push({ type: 'global_callback_counter_negative', count: info.CQtasksALL }); } } catch (error) { log('red', `✗ 检测计数器失败:`, error.message); } return issues; } async function checkQueueCounterConsistency() { log('cyan', '\n========== 检测队列与计数器一致性 =========='); const issues = []; try { const initInfo = await redis.json.get(QUEUE_NAMES.initInfo, { path: '$' }); if (!initInfo || !initInfo[0]) { return issues; } const info = initInfo[0]; const actualPollingLength = await redis.lLen(QUEUE_NAMES.processPolling); const actualCallbackLength = await redis.lLen(QUEUE_NAMES.processCallback); const expectedPQtasksALL = actualPollingLength + actualCallbackLength; if (info.PQtasksALL !== expectedPQtasksALL) { const issue = `PQtasksALL (${info.PQtasksALL}) 与实际队列长度 (${expectedPQtasksALL}) 不一致`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'counter_queue_mismatch', counter: 'PQtasksALL', counterValue: info.PQtasksALL, actualValue: expectedPQtasksALL }); } else { log('green', `✓ PQtasksALL 与实际队列长度一致`); } const actualCallbackPending = await redis.hLen(QUEUE_NAMES.callbackPending); if (info.CQtasksALL !== actualCallbackPending) { const issue = `CQtasksALL (${info.CQtasksALL}) 与实际回调等待数 (${actualCallbackPending}) 不一致`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'counter_queue_mismatch', counter: 'CQtasksALL', counterValue: info.CQtasksALL, actualValue: actualCallbackPending }); } else { log('green', `✓ CQtasksALL 与实际回调等待数一致`); } if (info.platforms) { for (const [key, platform] of Object.entries(info.platforms)) { if (platform.waitQueue) { const actualWaitQueueLength = await redis.lLen(platform.waitQueue); const counterWQtasks = platform.WQtasks ?? 0; if (counterWQtasks !== actualWaitQueueLength) { const issue = `平台 [${key}] WQtasks (${counterWQtasks}) 与实际等待队列长度 (${actualWaitQueueLength}) 不一致`; log('yellow', `⚠️ ${issue}`); issues.push({ type: 'wait_queue_counter_mismatch', platform: key, counter: 'WQtasks', counterValue: counterWQtasks, actualValue: actualWaitQueueLength }); } else { log('green', `✓ 平台 [${key}] WQtasks 与实际等待队列长度一致`); } } } } } catch (error) { log('red', `✗ 检测一致性失败:`, error.message); } return issues; } async function generateReport(allIssues) { log('cyan', '\n========================================'); log('cyan', ' 检测报告汇总'); log('cyan', '========================================'); const totalIssues = allIssues.flat().length; if (totalIssues === 0) { log('green', '\n✓ 没有发现任何问题,队列状态正常!'); return; } log('yellow', `\n共发现 ${totalIssues} 个问题:\n`); const issueTypes = {}; for (const issues of allIssues) { for (const issue of issues) { if (!issueTypes[issue.type]) { issueTypes[issue.type] = []; } issueTypes[issue.type].push(issue); } } for (const [type, typeIssues] of Object.entries(issueTypes)) { log('yellow', `\n[${type}] - ${typeIssues.length} 个`); for (const issue of typeIssues) { if (issue.queue) { log('yellow', ` - 队列: ${issue.queue}, 数量: ${issue.count}`); } else if (issue.platform) { log('yellow', ` - 平台: ${issue.platform}, 数量: ${issue.count || 'N/A'}`); } else if (issue.counter) { log('yellow', ` - 计数器: ${issue.counter}, 计数器值: ${issue.counterValue}, 实际值: ${issue.actualValue}`); } else { log('yellow', ` - ${JSON.stringify(issue)}`); } } } log('cyan', '\n========== 建议操作 =========='); if (issueTypes.wait_queue_backlog) { log('blue', '1. 等待队列有积压,建议检查消费者是否正常运行'); } if (issueTypes.process_queue_has_tasks || issueTypes.callback_queue_has_tasks) { log('blue', '2. 处理队列有任务,建议检查任务处理逻辑是否卡住'); } if (issueTypes.callback_pending_timeout) { log('blue', '3. 有超时的回调等待任务,建议手动清理或重新处理'); } if (issueTypes.counter_queue_mismatch) { log('blue', '4. 计数器与实际队列不一致,建议重置计数器'); } if (issueTypes.pending_messages_backlog) { log('blue', '5. 有待处理消息积压,建议检查消息发送逻辑'); } } async function main() { console.log('\n========================================'); console.log(' Redis 队列检测脚本'); console.log(' 项目前缀:', PREFIX); console.log(' 检测时间:', new Date().toLocaleString('zh-CN')); console.log('========================================\n'); try { await redis.connect(); log('green', 'Redis 连接成功\n'); const allIssues = []; allIssues.push(await checkWaitQueues()); allIssues.push(await checkProcessQueues()); allIssues.push(await checkResultQueues()); allIssues.push(await checkErrorQueues()); allIssues.push(await checkCallbackPending()); allIssues.push(await checkPendingMessages()); allIssues.push(await checkCounters()); allIssues.push(await checkQueueCounterConsistency()); await generateReport(allIssues); } catch (error) { log('red', '\n检测过程出错:', error.message); console.error(error); } finally { await redis.disconnect(); log('blue', '\nRedis 连接已关闭'); } } main();