import redis from './index.js'; import initQueue from './initQueue.js'; const logger = { info: (message) => { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] INFO: ${message}`); }, error: (message, error) => { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR: ${message}`, error || ''); }, debug: (message) => { const timestamp = new Date().toISOString(); console.debug(`[${timestamp}] DEBUG: ${message}`); } }; class MessagePersistence { constructor() { this.pendingMessagesKey = initQueue.pendingMessages; } async savePendingMessage(backendId, message) { try { const messageData = { backendId, message, timestamp: Date.now(), retryCount: 0 }; const messageKey = `${this.pendingMessagesKey}:${backendId}:${Date.now()}`; await redis.hSet(messageKey, messageData); await redis.lPush(this.pendingMessagesKey, messageKey); logger.debug(`保存待发送消息: backendId=${backendId}, messageKey=${messageKey}`); return messageKey; } catch (error) { logger.error(`保存待发送消息失败: backendId=${backendId}`, error); throw error; } } async getPendingMessages(backendId) { try { const allMessageKeys = await redis.lRange(this.pendingMessagesKey, 0, -1); const pendingMessages = []; for (const messageKey of allMessageKeys) { if (messageKey.includes(`:${backendId}:`)) { const messageData = await redis.hGetAll(messageKey); if (messageData && messageData.message) { pendingMessages.push({ key: messageKey, backendId: messageData.backendId, message: messageData.message, timestamp: parseInt(messageData.timestamp), retryCount: parseInt(messageData.retryCount || 0) }); } } } logger.debug(`获取待发送消息: backendId=${backendId}, count=${pendingMessages.length}`); return pendingMessages; } catch (error) { logger.error(`获取待发送消息失败: backendId=${backendId}`, error); return []; } } async removePendingMessage(messageKey) { try { await redis.del(messageKey); await redis.lRem(this.pendingMessagesKey, 1, messageKey); logger.debug(`删除已发送消息: messageKey=${messageKey}`); } catch (error) { logger.error(`删除待发送消息失败: messageKey=${messageKey}`, error); } } async incrementRetryCount(messageKey) { try { await redis.hIncrBy(messageKey, 'retryCount', 1); logger.debug(`增加重试次数: messageKey=${messageKey}`); } catch (error) { logger.error(`增加重试次数失败: messageKey=${messageKey}`, error); } } async cleanupOldMessages(maxAge = 7 * 24 * 60 * 60 * 1000) { try { const allMessageKeys = await redis.lRange(this.pendingMessagesKey, 0, -1); const now = Date.now(); const keysToDelete = []; for (const messageKey of allMessageKeys) { const messageData = await redis.hGetAll(messageKey); if (messageData && messageData.timestamp) { const messageAge = now - parseInt(messageData.timestamp); if (messageAge > maxAge) { keysToDelete.push(messageKey); } } } if (keysToDelete.length > 0) { const multi = redis.multi(); for (const key of keysToDelete) { multi.del(key); multi.lRem(this.pendingMessagesKey, 1, key); } await multi.exec(); logger.info(`清理过期消息: count=${keysToDelete.length}`); } } catch (error) { logger.error('清理过期消息失败:', error); } } } export default new MessagePersistence();