shuzhiren-comfyui/任务队列后端/utils/mdWebSocketServer.js

188 lines
5.5 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dotenv from 'dotenv';
dotenv.config();
import { WebSocketServer as WSServer } from 'ws';
import { getExternalCapacityMax } from '../config/Config.js';
import redis from '../redis/index.js';
// Redis key 定义
const REDIS_KEYS = {
CAPACITY: `${process.env.PROJECT_PREFIX}:md:capacity`,
JWT: `${process.env.PROJECT_PREFIX}:md:jwt`
};
class MDWebSocketServer {
constructor() {
this.wss = null;
this.connectedClients = new Map();
this.currentJwtToken = null;
this.currentCapacity = { internal: 0, external: 0 };
this.instances = new Map();
this.port = process.env.MESSAGE_DISPATCHER_WS_PORT || 8088;
}
async init() {
return new Promise((resolve, reject) => {
this.wss = new WSServer({ port: this.port });
this.wss.on('listening', () => {
console.log(`[MDWebSocketServer] WebSocket 服务已启动,端口: ${this.port}`);
resolve();
});
this.wss.on('connection', (ws) => {
console.log('[MDWebSocketServer] 新的 WebSocket 连接已建立');
const clientId = Date.now().toString();
this.connectedClients.set(clientId, ws);
ws.on('message', (data) => {
this.handleMessage(ws, data);
});
ws.on('close', () => {
console.log('[MDWebSocketServer] WebSocket 连接已关闭');
this.connectedClients.delete(clientId);
});
ws.on('error', (error) => {
console.error('[MDWebSocketServer] WebSocket 连接错误:', error);
this.connectedClients.delete(clientId);
});
});
this.wss.on('error', (error) => {
console.error('[MDWebSocketServer] WebSocket 服务错误:', error);
reject(error);
});
});
}
handleMessage(ws, data) {
try {
const message = JSON.parse(data.toString());
console.log(`[MDWebSocketServer] 收到消息: ${message.type}`);
switch (message.type) {
case 'JWT_UPDATE':
this.handleJwtUpdate(message.data);
break;
case 'CAPACITY_UPDATE':
this.handleCapacityUpdate(message.data);
break;
case 'INSTANCE_ONLINE':
this.handleInstanceOnline(message.data);
break;
case 'INSTANCE_OFFLINE':
this.handleInstanceOffline(message.data);
break;
case 'HEARTBEAT':
this.handleHeartbeat(message.data, ws);
break;
default:
console.log('[MDWebSocketServer] 未知消息类型:', message.type);
}
} catch (error) {
console.error('[MDWebSocketServer] 解析消息失败:', error);
}
}
async handleJwtUpdate(data) {
this.currentJwtToken = data.token;
console.log('[MDWebSocketServer] JWT Token 已更新');
// 同时存储到 Redis供 worker 线程读取
try {
await redis.set(REDIS_KEYS.JWT, data.token);
console.log('[MDWebSocketServer] JWT Token 已存储到 Redis');
} catch (error) {
console.error('[MDWebSocketServer] JWT Token 存储到 Redis 失败:', error);
}
}
async handleCapacityUpdate(data) {
if (data.summary) {
this.currentCapacity.internal = data.summary.onlineInstances - data.summary.busyInstances;
console.log(`[MDWebSocketServer] 算力状态已更新: 内部可用 = ${this.currentCapacity.internal}`);
// 同时存储到 Redis供 worker 线程读取
try {
await redis.set(REDIS_KEYS.CAPACITY, this.currentCapacity.internal.toString());
console.log(`[MDWebSocketServer] 算力信息已存储到 Redis: ${this.currentCapacity.internal}`);
} catch (error) {
console.error('[MDWebSocketServer] 算力信息存储到 Redis 失败:', error);
}
}
}
handleInstanceOnline(data) {
this.instances.set(data.instanceId, { ...data, status: 'online' });
console.log(`[MDWebSocketServer] 实例上线: ${data.instanceId}`);
}
handleInstanceOffline(data) {
this.instances.set(data.instanceId, { ...data, status: 'offline' });
console.log(`[MDWebSocketServer] 实例下线: ${data.instanceId}`);
}
handleHeartbeat(data, ws) {
ws.send(JSON.stringify({
type: 'HEARTBEAT_ACK',
data: { timestamp: new Date().toISOString() }
}));
}
getJwtToken() {
return this.currentJwtToken;
}
getInternalCapacity() {
return this.currentCapacity.internal;
}
getExternalCapacity() {
return getExternalCapacityMax() || 10;
}
getInstances() {
return Array.from(this.instances.values());
}
hasConnectedClients() {
return this.connectedClients.size > 0;
}
/**
* 从 Redis 读取内部算力信息
* @returns {Promise<number>} 内部可用算力
*/
async getInternalCapacityFromRedis() {
try {
const capacity = await redis.get(REDIS_KEYS.CAPACITY);
const result = capacity ? parseInt(capacity, 10) : 0;
console.log(`[MDWebSocketServer] 从 Redis 读取算力信息: ${result}`);
return result;
} catch (error) {
console.error('[MDWebSocketServer] 从 Redis 读取算力信息失败:', error);
return 0;
}
}
/**
* 从 Redis 读取 JWT Token
* @returns {Promise<string|null>} JWT Token
*/
async getJwtTokenFromRedis() {
try {
const token = await redis.get(REDIS_KEYS.JWT);
console.log(`[MDWebSocketServer] 从 Redis 读取 JWT Token: ${token ? '存在' : '不存在'}`);
return token;
} catch (error) {
console.error('[MDWebSocketServer] 从 Redis 读取 JWT Token 失败:', error);
return null;
}
}
}
export default new MDWebSocketServer();