shuzhiren-comfyui/任务队列后端/redis/messagePersistence.js

124 lines
3.8 KiB
JavaScript

import redis from './index.js';
import initQueue from './initQueue.js';
const logger = {
info: (message) => {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] INFO: ${message}`);
},
error: (message, error) => {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
},
debug: (message) => {
const timestamp = new Date().toISOString();
console.debug(`[${timestamp}] DEBUG: ${message}`);
}
};
class MessagePersistence {
constructor() {
this.pendingMessagesKey = initQueue.pendingMessages;
}
async savePendingMessage(backendId, message) {
try {
const messageData = {
backendId,
message,
timestamp: Date.now(),
retryCount: 0
};
const messageKey = `${this.pendingMessagesKey}:${backendId}:${Date.now()}`;
await redis.hSet(messageKey, messageData);
await redis.lPush(this.pendingMessagesKey, messageKey);
logger.debug(`保存待发送消息: backendId=${backendId}, messageKey=${messageKey}`);
return messageKey;
} catch (error) {
logger.error(`保存待发送消息失败: backendId=${backendId}`, error);
throw error;
}
}
async getPendingMessages(backendId) {
try {
const allMessageKeys = await redis.lRange(this.pendingMessagesKey, 0, -1);
const pendingMessages = [];
for (const messageKey of allMessageKeys) {
if (messageKey.includes(`:${backendId}:`)) {
const messageData = await redis.hGetAll(messageKey);
if (messageData && messageData.message) {
pendingMessages.push({
key: messageKey,
backendId: messageData.backendId,
message: messageData.message,
timestamp: parseInt(messageData.timestamp),
retryCount: parseInt(messageData.retryCount || 0)
});
}
}
}
logger.debug(`获取待发送消息: backendId=${backendId}, count=${pendingMessages.length}`);
return pendingMessages;
} catch (error) {
logger.error(`获取待发送消息失败: backendId=${backendId}`, error);
return [];
}
}
async removePendingMessage(messageKey) {
try {
await redis.del(messageKey);
await redis.lRem(this.pendingMessagesKey, 1, messageKey);
logger.debug(`删除已发送消息: messageKey=${messageKey}`);
} catch (error) {
logger.error(`删除待发送消息失败: messageKey=${messageKey}`, error);
}
}
async incrementRetryCount(messageKey) {
try {
await redis.hIncrBy(messageKey, 'retryCount', 1);
logger.debug(`增加重试次数: messageKey=${messageKey}`);
} catch (error) {
logger.error(`增加重试次数失败: messageKey=${messageKey}`, error);
}
}
async cleanupOldMessages(maxAge = 7 * 24 * 60 * 60 * 1000) {
try {
const allMessageKeys = await redis.lRange(this.pendingMessagesKey, 0, -1);
const now = Date.now();
const keysToDelete = [];
for (const messageKey of allMessageKeys) {
const messageData = await redis.hGetAll(messageKey);
if (messageData && messageData.timestamp) {
const messageAge = now - parseInt(messageData.timestamp);
if (messageAge > maxAge) {
keysToDelete.push(messageKey);
}
}
}
if (keysToDelete.length > 0) {
const multi = redis.multi();
for (const key of keysToDelete) {
multi.del(key);
multi.lRem(this.pendingMessagesKey, 1, key);
}
await multi.exec();
logger.info(`清理过期消息: count=${keysToDelete.length}`);
}
} catch (error) {
logger.error('清理过期消息失败:', error);
}
}
}
export default new MessagePersistence();