124 lines
3.8 KiB
JavaScript
124 lines
3.8 KiB
JavaScript
import redis from './index.js';
|
|
import initQueue from './initQueue.js';
|
|
|
|
const logger = {
|
|
info: (message) => {
|
|
const timestamp = new Date().toISOString();
|
|
console.log(`[${timestamp}] INFO: ${message}`);
|
|
},
|
|
error: (message, error) => {
|
|
const timestamp = new Date().toISOString();
|
|
console.error(`[${timestamp}] ERROR: ${message}`, error || '');
|
|
},
|
|
debug: (message) => {
|
|
const timestamp = new Date().toISOString();
|
|
console.debug(`[${timestamp}] DEBUG: ${message}`);
|
|
}
|
|
};
|
|
|
|
class MessagePersistence {
|
|
constructor() {
|
|
this.pendingMessagesKey = initQueue.pendingMessages;
|
|
}
|
|
|
|
async savePendingMessage(backendId, message) {
|
|
try {
|
|
const messageData = {
|
|
backendId,
|
|
message,
|
|
timestamp: Date.now(),
|
|
retryCount: 0
|
|
};
|
|
|
|
const messageKey = `${this.pendingMessagesKey}:${backendId}:${Date.now()}`;
|
|
await redis.hSet(messageKey, messageData);
|
|
await redis.lPush(this.pendingMessagesKey, messageKey);
|
|
|
|
logger.debug(`保存待发送消息: backendId=${backendId}, messageKey=${messageKey}`);
|
|
return messageKey;
|
|
} catch (error) {
|
|
logger.error(`保存待发送消息失败: backendId=${backendId}`, error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async getPendingMessages(backendId) {
|
|
try {
|
|
const allMessageKeys = await redis.lRange(this.pendingMessagesKey, 0, -1);
|
|
const pendingMessages = [];
|
|
|
|
for (const messageKey of allMessageKeys) {
|
|
if (messageKey.includes(`:${backendId}:`)) {
|
|
const messageData = await redis.hGetAll(messageKey);
|
|
if (messageData && messageData.message) {
|
|
pendingMessages.push({
|
|
key: messageKey,
|
|
backendId: messageData.backendId,
|
|
message: messageData.message,
|
|
timestamp: parseInt(messageData.timestamp),
|
|
retryCount: parseInt(messageData.retryCount || 0)
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.debug(`获取待发送消息: backendId=${backendId}, count=${pendingMessages.length}`);
|
|
return pendingMessages;
|
|
} catch (error) {
|
|
logger.error(`获取待发送消息失败: backendId=${backendId}`, error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async removePendingMessage(messageKey) {
|
|
try {
|
|
await redis.del(messageKey);
|
|
await redis.lRem(this.pendingMessagesKey, 1, messageKey);
|
|
logger.debug(`删除已发送消息: messageKey=${messageKey}`);
|
|
} catch (error) {
|
|
logger.error(`删除待发送消息失败: messageKey=${messageKey}`, error);
|
|
}
|
|
}
|
|
|
|
async incrementRetryCount(messageKey) {
|
|
try {
|
|
await redis.hIncrBy(messageKey, 'retryCount', 1);
|
|
logger.debug(`增加重试次数: messageKey=${messageKey}`);
|
|
} catch (error) {
|
|
logger.error(`增加重试次数失败: messageKey=${messageKey}`, error);
|
|
}
|
|
}
|
|
|
|
async cleanupOldMessages(maxAge = 7 * 24 * 60 * 60 * 1000) {
|
|
try {
|
|
const allMessageKeys = await redis.lRange(this.pendingMessagesKey, 0, -1);
|
|
const now = Date.now();
|
|
const keysToDelete = [];
|
|
|
|
for (const messageKey of allMessageKeys) {
|
|
const messageData = await redis.hGetAll(messageKey);
|
|
if (messageData && messageData.timestamp) {
|
|
const messageAge = now - parseInt(messageData.timestamp);
|
|
if (messageAge > maxAge) {
|
|
keysToDelete.push(messageKey);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (keysToDelete.length > 0) {
|
|
const multi = redis.multi();
|
|
for (const key of keysToDelete) {
|
|
multi.del(key);
|
|
multi.lRem(this.pendingMessagesKey, 1, key);
|
|
}
|
|
await multi.exec();
|
|
logger.info(`清理过期消息: count=${keysToDelete.length}`);
|
|
}
|
|
} catch (error) {
|
|
logger.error('清理过期消息失败:', error);
|
|
}
|
|
}
|
|
}
|
|
|
|
export default new MessagePersistence();
|