188 lines
5.5 KiB
JavaScript
188 lines
5.5 KiB
JavaScript
import dotenv from 'dotenv';
|
||
|
||
dotenv.config();
|
||
import { WebSocketServer as WSServer } from 'ws';
|
||
import { getExternalCapacityMax } from '../config/Config.js';
|
||
import redis from '../redis/index.js';
|
||
|
||
// Redis key 定义
|
||
const REDIS_KEYS = {
|
||
CAPACITY: `${process.env.PROJECT_PREFIX}:md:capacity`,
|
||
JWT: `${process.env.PROJECT_PREFIX}:md:jwt`
|
||
};
|
||
|
||
class MDWebSocketServer {
|
||
constructor() {
|
||
this.wss = null;
|
||
this.connectedClients = new Map();
|
||
this.currentJwtToken = null;
|
||
this.currentCapacity = { internal: 0, external: 0 };
|
||
this.instances = new Map();
|
||
this.port = process.env.MESSAGE_DISPATCHER_WS_PORT || 8088;
|
||
}
|
||
|
||
async init() {
|
||
return new Promise((resolve, reject) => {
|
||
this.wss = new WSServer({ port: this.port });
|
||
|
||
this.wss.on('listening', () => {
|
||
console.log(`[MDWebSocketServer] WebSocket 服务已启动,端口: ${this.port}`);
|
||
resolve();
|
||
});
|
||
|
||
this.wss.on('connection', (ws) => {
|
||
console.log('[MDWebSocketServer] 新的 WebSocket 连接已建立');
|
||
|
||
const clientId = Date.now().toString();
|
||
this.connectedClients.set(clientId, ws);
|
||
|
||
ws.on('message', (data) => {
|
||
this.handleMessage(ws, data);
|
||
});
|
||
|
||
ws.on('close', () => {
|
||
console.log('[MDWebSocketServer] WebSocket 连接已关闭');
|
||
this.connectedClients.delete(clientId);
|
||
});
|
||
|
||
ws.on('error', (error) => {
|
||
console.error('[MDWebSocketServer] WebSocket 连接错误:', error);
|
||
this.connectedClients.delete(clientId);
|
||
});
|
||
});
|
||
|
||
this.wss.on('error', (error) => {
|
||
console.error('[MDWebSocketServer] WebSocket 服务错误:', error);
|
||
reject(error);
|
||
});
|
||
});
|
||
}
|
||
|
||
handleMessage(ws, data) {
|
||
try {
|
||
const message = JSON.parse(data.toString());
|
||
console.log(`[MDWebSocketServer] 收到消息: ${message.type}`);
|
||
|
||
switch (message.type) {
|
||
case 'JWT_UPDATE':
|
||
this.handleJwtUpdate(message.data);
|
||
break;
|
||
case 'CAPACITY_UPDATE':
|
||
this.handleCapacityUpdate(message.data);
|
||
break;
|
||
case 'INSTANCE_ONLINE':
|
||
this.handleInstanceOnline(message.data);
|
||
break;
|
||
case 'INSTANCE_OFFLINE':
|
||
this.handleInstanceOffline(message.data);
|
||
break;
|
||
case 'HEARTBEAT':
|
||
this.handleHeartbeat(message.data, ws);
|
||
break;
|
||
default:
|
||
console.log('[MDWebSocketServer] 未知消息类型:', message.type);
|
||
}
|
||
} catch (error) {
|
||
console.error('[MDWebSocketServer] 解析消息失败:', error);
|
||
}
|
||
}
|
||
|
||
async handleJwtUpdate(data) {
|
||
this.currentJwtToken = data.token;
|
||
console.log('[MDWebSocketServer] JWT Token 已更新');
|
||
|
||
// 同时存储到 Redis,供 worker 线程读取
|
||
try {
|
||
await redis.set(REDIS_KEYS.JWT, data.token);
|
||
console.log('[MDWebSocketServer] JWT Token 已存储到 Redis');
|
||
} catch (error) {
|
||
console.error('[MDWebSocketServer] JWT Token 存储到 Redis 失败:', error);
|
||
}
|
||
}
|
||
|
||
async handleCapacityUpdate(data) {
|
||
if (data.summary) {
|
||
this.currentCapacity.internal = data.summary.onlineInstances - data.summary.busyInstances;
|
||
console.log(`[MDWebSocketServer] 算力状态已更新: 内部可用 = ${this.currentCapacity.internal}`);
|
||
|
||
// 同时存储到 Redis,供 worker 线程读取
|
||
try {
|
||
await redis.set(REDIS_KEYS.CAPACITY, this.currentCapacity.internal.toString());
|
||
console.log(`[MDWebSocketServer] 算力信息已存储到 Redis: ${this.currentCapacity.internal}`);
|
||
} catch (error) {
|
||
console.error('[MDWebSocketServer] 算力信息存储到 Redis 失败:', error);
|
||
}
|
||
}
|
||
}
|
||
|
||
handleInstanceOnline(data) {
|
||
this.instances.set(data.instanceId, { ...data, status: 'online' });
|
||
console.log(`[MDWebSocketServer] 实例上线: ${data.instanceId}`);
|
||
}
|
||
|
||
handleInstanceOffline(data) {
|
||
this.instances.set(data.instanceId, { ...data, status: 'offline' });
|
||
console.log(`[MDWebSocketServer] 实例下线: ${data.instanceId}`);
|
||
}
|
||
|
||
handleHeartbeat(data, ws) {
|
||
ws.send(JSON.stringify({
|
||
type: 'HEARTBEAT_ACK',
|
||
data: { timestamp: new Date().toISOString() }
|
||
}));
|
||
}
|
||
|
||
getJwtToken() {
|
||
return this.currentJwtToken;
|
||
}
|
||
|
||
getInternalCapacity() {
|
||
return this.currentCapacity.internal;
|
||
}
|
||
|
||
getExternalCapacity() {
|
||
return getExternalCapacityMax() || 10;
|
||
}
|
||
|
||
getInstances() {
|
||
return Array.from(this.instances.values());
|
||
}
|
||
|
||
hasConnectedClients() {
|
||
return this.connectedClients.size > 0;
|
||
}
|
||
|
||
/**
|
||
* 从 Redis 读取内部算力信息
|
||
* @returns {Promise<number>} 内部可用算力
|
||
*/
|
||
async getInternalCapacityFromRedis() {
|
||
try {
|
||
const capacity = await redis.get(REDIS_KEYS.CAPACITY);
|
||
const result = capacity ? parseInt(capacity, 10) : 0;
|
||
console.log(`[MDWebSocketServer] 从 Redis 读取算力信息: ${result}`);
|
||
return result;
|
||
} catch (error) {
|
||
console.error('[MDWebSocketServer] 从 Redis 读取算力信息失败:', error);
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 从 Redis 读取 JWT Token
|
||
* @returns {Promise<string|null>} JWT Token
|
||
*/
|
||
async getJwtTokenFromRedis() {
|
||
try {
|
||
const token = await redis.get(REDIS_KEYS.JWT);
|
||
console.log(`[MDWebSocketServer] 从 Redis 读取 JWT Token: ${token ? '存在' : '不存在'}`);
|
||
return token;
|
||
} catch (error) {
|
||
console.error('[MDWebSocketServer] 从 Redis 读取 JWT Token 失败:', error);
|
||
return null;
|
||
}
|
||
}
|
||
}
|
||
|
||
export default new MDWebSocketServer();
|