diff --git a/AI实现指导提示词.md b/AI实现指导提示词.md deleted file mode 100644 index 599716d..0000000 --- a/AI实现指导提示词.md +++ /dev/null @@ -1,1083 +0,0 @@ -# AI 实现指导提示词 - -## 任务概述 - -你需要实现将任务队列后端的 runninghub 任务优先分发到内部 message-dispatcher 系统的功能。请按照以下详细指导进行修改。 - ---- - -## 一、任务队列后端项目修改指导 - -### 项目路径 -`d:\Ke_xue_web\独立项目\comfyui桥接器\任务队列后端\` - ---- - -### 修改点 0:新增 WebSocket 通信模块 - -**文件:** `utils/mdWebSocketClient.js`(新建) - -**通信架构说明:** -- message-dispatcher 主动通过 WebSocket 连接任务队列后端 -- WebSocket 用于: - - JWT Token 接收与定期更新 - - 算力状态上报 - - 实例状态变化同步 - - 健康检查 -- **仅任务提交通过 HTTP 接口完成** -- 任务提交接口 URL 通过环境变量获取,与 runninghub 保持一致 -- **所有其他通信(包括 Token 获取)均通过 WebSocket** - -**功能要求:** -1. 作为 WebSocket 服务端,等待 message-dispatcher 连接 -2. 接收 JWT Token 消息(JWT_UPDATE) -3. 接收算力状态更新消息(CAPACITY_UPDATE) -4. 接收实例状态变化消息(INSTANCE_ONLINE/INSTANCE_OFFLINE 等) -5. 发送心跳响应 -6. 提供 Token、算力状态查询接口 - -**核心方法:** -```javascript -class MDWebSocketServer { - constructor() { - this.wss = null; - this.connectedClients = new Map(); - this.currentJwtToken = null; - this.currentCapacity = { internal: 0, external: 0 }; - this.instances = new Map(); - } - - // 初始化并启动 WebSocket 服务 - async init() - - // 获取当前 JWT Token - getJwtToken() - - // 获取当前内部算力可用数 - getInternalCapacity() - - // 获取当前外部容量 - getExternalCapacity() - - // 获取所有实例状态 - getInstances() - - // 检查是否有连接的客户端 - hasConnectedClients() -} -``` - -**WebSocket 消息处理:** -```javascript -// JWT Token 更新 -handleJwtUpdate(data) { - this.currentJwtToken = data.token; - console.log('[MDWebSocketServer] JWT Token 已更新'); -} - -// 算力状态更新 -handleCapacityUpdate(data) { - this.currentCapacity.internal = data.summary.onlineInstances - data.summary.busyInstances; -} - -// 实例上线 -handleInstanceOnline(data) { - this.instances.set(data.instanceId, { ...data, status: 'online' }); -} - -// 实例下线 -handleInstanceOffline(data) { - this.instances.set(data.instanceId, { ...data, status: 'offline' }); -} - -// 心跳响应 -handleHeartbeat(data, ws) { - ws.send(JSON.stringify({ - type: 'HEARTBEAT_ACK', - data: { timestamp: new Date().toISOString() } - })); -} -``` - -**WebSocket 消息格式:** -```javascript -// message-dispatcher 发送的 JWT 更新消息 -{ - type: 'JWT_UPDATE', - data: { - token: 'eyJhbGciOiJIUzI1NiIs...', - expiresAt: '2024-01-02T00:00:00.000Z', - timestamp: '2024-01-01T00:00:00.000Z' - } -} - -// message-dispatcher 发送的心跳 -{ - type: 'HEARTBEAT', - data: { - timestamp: '2024-01-01T00:00:00.000Z' - } -} -``` - ---- - -### 修改点 1:新增 messageDispatcher 平台适配器 - -**文件:** `outside/outPlatforms/messageDispatcher.js`(新建) - -**核心原则:** -- 与 runninghub.js 保持完全一致的接口方法签名 -- 仅修改请求地址和 apiKey 字段的值 -- 请求体所有字段名称、类型、格式与 runninghub 完全一致 -- 任务提交通过 HTTP 接口,URL 通过环境变量获取 - -**要求:** -- 参考 `runninghub.js` 的接口设计,保持方法签名完全一致 -- 实现以下 6 个核心方法: - -```javascript -getGenerateUrl() // 返回 message-dispatcher 的任务提交接口(从环境变量获取) -getGenerateHeader(apikey) // 返回请求头(包含 JWT Token 作为 apiKey) -getGenerateBody(task) // 构造请求体(与 runninghub 完全一致) -getSuccessTasks(response) // 处理成功响应,转换为 runninghub 兼容格式 -getTaskResult(response) // 处理结果回调 -getQueryUrl() // 返回回调地址(与 runninghub 保持一致) -``` - -**请求体标准化规则:** - -| 字段名称 | 类型 | 是否修改 | 说明 | -|---------|------|---------|------| -| `workflow_id` | String | 否 | 原样保留 | -| `node_info_list` | Array | 否 | 原样保留 | -| `apiKey` | String | 是 | 值改为 JWT Token | -| `webhookUrl` | String | 否 | 原样保留 | -| 其他所有字段 | - | 否 | 原样保留 | - -**请求体示例对比:** - -```javascript -// runninghub 请求体(原样) -{ - "workflow_id": "123", - "node_info_list": [...], - "apiKey": "runninghub-api-key-xxx", - "webhookUrl": "http://callback-url" -} - -// message-dispatcher 请求体(仅修改 apiKey 值) -{ - "workflow_id": "123", - "node_info_list": [...], - "apiKey": "eyJhbGciOiJIUzI1NiIs...", // JWT Token - "webhookUrl": "http://callback-url" -} -``` - -**响应转换规则:** -```javascript -// message-dispatcher 响应 -{ success: true, data: { requestId: "xxx" } } - -// 转换为 -{ msg: "success", code: 0, data: { taskId: "xxx" } } -``` - ---- - -### 修改点 2:简化 JWT Token 获取(不再需要独立模块) - -**重要说明:** -- 不再需要独立的 JWTManager 模块 -- JWT Token 由 message-dispatcher 通过 WebSocket 主动推送 -- Token 定期更新也通过 WebSocket 推送 -- 从 MDWebSocketServer 获取 Token 即可 - -**Token 获取方式:** -```javascript -// 从 WebSocket 服务端获取当前 Token -const jwtToken = mdWebSocketServer.getJwtToken(); -``` - ---- - -### 修改点 3:新增任务分流模块 - -**文件:** `utils/taskDistributor.js`(新建) - -**功能要求:** -1. 移除高低优先级区分 -2. 实现统一的任务分流逻辑 -3. 从 WebSocket 服务端获取实时容量信息 -4. 根据容量进行任务分配 - -**核心计算公式:** -``` -内部算力可用数 = 从 MDWebSocketClient 获取 -外部容量上限 = 从配置/环境变量获取(如 10) -总可分发任务上限 = 内部算力可用数 + 外部容量上限 -``` - -**分流策略:** - -| 待分发任务数 | 内部容量 | 外部容量 | 分配结果 | -|------------|---------|---------|---------| -| ≤ 内部 | 实时 | 10 | 全部走内部 | -| > 内部 ≤ 总 | 实时 | 10 | 前N内部,超出部分走外部 | -| > 总 | 实时 | 10 | 内部 + 外部,剩余等待 | - -**具体示例:** -``` -示例 1: -内部 = 30(实时), 外部 = 10, 待分发 = 25 -结果:全部 25 个走内部 - -示例 2: -内部 = 30(实时), 外部 = 10, 待分发 = 35 -结果:30 个内部,5 个外部 - -示例 3: -内部 = 30(实时), 外部 = 10, 待分发 = 45 -结果:30 个内部,10 个外部,5 个等待 -``` - -**核心实现:** -```javascript -async function distributeTasks(tasks, mdWebSocketServer) { - const internalCapacity = mdWebSocketServer.getInternalCapacity(); - const externalCapacity = await getExternalCapacityFromConfig(); - - const internalTasks = tasks.slice(0, internalCapacity); - const externalTasks = tasks.slice(internalCapacity, internalCapacity + externalCapacity); - const remainingTasks = tasks.slice(internalCapacity + externalCapacity); - - return { internalTasks, externalTasks, remainingTasks }; -} -``` - ---- - -### 修改点 4:修改任务分发逻辑(批量) - -**文件:** `worker_threads/wait/waiting.js` - -**修改位置:** 任务批量获取和分发逻辑 - -**要求:** -- 引入 MDWebSocketServer -- 在获取任务后,先调用 taskDistributor 进行分流(传入 WebSocket 服务端) -- 根据分流结果分别分发到内部/外部 -- 剩余任务返回队列 - ---- - -### 修改点 5:修改任务分发逻辑(单个) - -**文件:** `outside/generat.js` - -**修改位置:** `externalPostRequest()` 函数 - -**决策逻辑:** -```javascript -if (platform === 'runninghub') { - if (使用内部算力) { - 尝试使用 messageDispatcher 平台发送任务 - if (成功) { - 返回内部结果 - } else { - 记录降级日志 - 降级使用 runninghub - } - } else { - 使用 runninghub - } -} else { - 使用原平台 -} -``` - -**关键实现:** -- 引入 MDWebSocketServer(从这里获取 JWT Token) -- 引入 messageDispatcher 平台适配器 -- 增加降级日志记录 -- 保持原有错误处理逻辑不变 - ---- - -### 修改点 6:更新平台管理 - -**文件:** `outside/outPlatforms/outside.js` - -**修改内容:** -- 导入 messageDispatcher 模块 -- 将其添加到导出对象中 - -```javascript -import * as runninghub from './runninghub.js'; -import * as jimuai from './JimuAI.js'; -import coze from './coze/coze.js'; -import * as messageDispatcher from './messageDispatcher.js'; // 新增 - -export default { runninghub, jimuai, coze, messageDispatcher }; // 新增 -``` - ---- - -### 修改点 7:更新环境变量配置 - -**文件:** `.env` - -**新增配置:** -```env -# Message Dispatcher 配置 -MESSAGE_DISPATCHER_URL=http://localhost:4000 -MESSAGE_DISPATCHER_WS_PORT=8087 -MESSAGE_DISPATCHER_ENABLED=true -MESSAGE_DISPATCHER_TIMEOUT=30000 - -# 外部容量配置 -EXTERNAL_CAPACITY_MAX=10 -``` - -**说明:** -- `MESSAGE_DISPATCHER_URL`:HTTP 任务提交接口 URL -- `MESSAGE_DISPATCHER_WS_PORT`:任务队列后端 WebSocket 服务端口 -- 不再需要 MD_USERNAME 和 MD_PASSWORD(Token 通过 WebSocket 推送) - ---- - -### 修改点 9:新增配置文件 - -**文件:** `config/messageDispatcher.json`(新建) - -```json -{ - "enabled": true, - "priority": true, - "task": { - "timeout": 30000, - "retryCount": 1 - }, - "capacity": { - "external": 10 - }, - "websocket": { - "port": 8087 - } -} -``` - ---- - -### 修改点 10:算力更新逻辑检查与修复 - -**文件:** `worker_threads/wait/waiting.js`、`worker_threads/callback_result/result.js`、`redis/initQueue.js` 等相关文件 - -**核心需求:** -- 详细检查任务状态管理与算力更新相关代码 -- 验证 30 个任务未全部完成,收到算力更新通知的场景 -- 实现未用算力数计算边界检查,防止负值 -- 添加防御性编程,避免空转或无限循环 -- 实现完整单元测试 - -#### 9.1 检查清单 - -| 检查项 | 检查文件 | 风险级别 | -|---------|---------|---------| -| 算力计数更新原子性 | `redis/initQueue.js` | 高 | -| 任务完成回调时算力计数 | `worker_threads/callback_result/result.js` | 高 | -| 算力减少时的任务处理 | `worker_threads/wait/waiting.js` | 高 | -| 负数检查与防御 | 所有相关文件 | 高 | -| 并发安全 | 所有相关文件 | 中 | - -#### 9.2 核心修复方案 - -**修复 1:添加算力计数边界检查** - -**文件:** `redis/initQueue.js` - -**问题:** 任务完成后,PQtasks(处理中任务数)可能减为负数 - -**修复代码:** -```javascript -async function reducePlatformsProcess(platformKey) { - const key = `${prefix}:platforms:${platformKey}`; - - try { - const current = await redis.hGet(key, 'PQtasks'); - let newValue = parseInt(current) - 1; - - // 边界检查:确保不小于 0 - if (newValue < 0) { - console.warn(`[CapacityManager] 检测到负值: ${platformKey} PQtasks = ${newValue}, 已修正为 0`); - newValue = 0; - } - - await redis.hSet(key, 'PQtasks', newValue.toString()); - console.log(`[CapacityManager] ${platformKey} PQtasks: ${current} -> ${newValue}`); - - return newValue; - } catch (error) { - console.error(`[CapacityManager] 更新 PQtasks 失败:`, error); - throw error; - } -} -``` - -**修复 2:添加算力更新状态锁** - -**文件:** `utils/capacityGuard.js`(新建) - -**功能:** 防止算力更新期间的并发问题 - -```javascript -class CapacityGuard { - constructor() { - this.updateLock = false; - this.pendingUpdates = []; - } - - async acquireLock() { - while (this.updateLock) { - await new Promise(resolve => setTimeout(resolve, 10)); - } - this.updateLock = true; - } - - releaseLock() { - this.updateLock = false; - - // 处理排队的更新 - if (this.pendingUpdates.length > 0) { - const nextUpdate = this.pendingUpdates.shift(); - nextUpdate(); - } - } - - async executeWithLock(fn) { - await this.acquireLock(); - try { - return await fn(); - } finally { - this.releaseLock(); - } - } -} - -export default new CapacityGuard(); -``` - -**修复 3:处理算力突然降低场景** - -**文件:** `worker_threads/wait/waiting.js` - -**场景:** 已发送 30 个任务,算力降低至 20,任务未全部完成 - -**修复代码:** -```javascript -async function handleCapacityReductionFromMD(newInternalCapacity) { - console.log(`[Waiting] 收到算力更新: 内部容量 -> ${newInternalCapacity}`); - - await capacityGuard.executeWithLock(async () => { - // 获取当前正在处理的任务数 - const currentProcessing = await getCurrentProcessingCount(); - - if (currentProcessing <= newInternalCapacity) { - console.log(`[Waiting] 当前处理数 ${currentProcessing} ≤ 新容量 ${newInternalCapacity}, 无需调整`); - return; - } - - const excess = currentProcessing - newInternalCapacity; - console.warn(`[Waiting] 检测到算力降低: 当前处理 ${currentProcessing} > 新容量 ${newInternalCapacity}, 超出 ${excess} 个任务`); - - // 记录超出情况,但不主动取消任务 - // 让任务自然完成,通过回调正确更新计数 - console.log(`[Waiting] 将等待任务自然完成,确保计数正确`); - }); -} -``` - -**修复 4:添加空转防御** - -**文件:** `worker_threads/wait/waiting.js` - -**问题:** 无任务时可能无限循环 - -**修复代码:** -```javascript -// 主循环 -(async () => { - let idleCount = 0; - const MAX_IDLE_COUNT = 10; // 最大连续空转次数 - const IDLE_SLEEP_MS = 10000; // 空转时的睡眠时间 - - while (true) { - try { - const wDeficiency = await judgConcurrency(); - - if (wDeficiency.length > 0) { - idleCount = 0; // 重置空转计数 - logger.info('有可进行处理的队列,数量: ' + wDeficiency.length); - - // ... 原有处理逻辑 ... - } else { - idleCount++; - - if (idleCount >= MAX_IDLE_COUNT) { - logger.debug(`连续空转 ${idleCount} 次,进入长睡眠`); - await new Promise(resolve => setTimeout(resolve, IDLE_SLEEP_MS)); - idleCount = 0; - } else { - logger.debug('没有可处理的队列'); - await new Promise(resolve => setTimeout(resolve, 10000)); - } - } - - } catch (error) { - logger.error('批量处理任务失败:', error); - await new Promise(resolve => setTimeout(resolve, 5000)); - } - } -})(); -``` - -#### 9.3 单元测试方案 - -**测试文件:** `test/capacity.test.js`(新建) - -**测试用例:** - -```javascript -describe('Capacity Management Tests', () => { - - test('正常情况: 任务完成后算力正确增加', async () => { - await initQueue.addPlatformsProcess({ 'digitalHuman:runninghub': 1 }); - const result = await initQueue.reducePlatformsProcess('digitalHuman:runninghub'); - assert.strictEqual(result, 0); - }); - - test('边界情况: 算力为0时尝试减少', async () => { - await initQueue.addPlatformsProcess({ 'digitalHuman:runninghub': 0 }); - const result = await initQueue.reducePlatformsProcess('digitalHuman:runninghub'); - assert.strictEqual(result, 0, '应该保持为0'); - }); - - test('边界情况: 算力从30降低到20时的处理', async () => { - // 模拟发送30个任务 - for (let i = 0; i < 30; i++) { - await initQueue.addPlatformsProcess({ 'digitalHuman:runninghub': 1 }); - } - - // 模拟收到算力降低通知 - await handleCapacityReductionFromMD(20); - - // 验证不会导致负数 - const currentPQtasks = await getCurrentPQtasks(); - assert(currentPQtasks >= 0, 'PQtasks 不能为负数'); - }); - - test('防御性测试: 并发更新', async () => { - const promises = []; - for (let i = 0; i < 100; i++) { - promises.push(initQueue.addPlatformsProcess({ 'digitalHuman:runninghub': 1 })); - promises.push(initQueue.reducePlatformsProcess('digitalHuman:runninghub')); - } - await Promise.all(promises); - const finalCount = await getCurrentPQtasks(); - assert(finalCount >= 0, '并发更新后不能为负数'); - }); - - test('防御性测试: 空转检测', async () => { - const startTime = Date.now(); - // 模拟无任务场景 - await simulateIdleLoop(); - const duration = Date.now() - startTime; - assert(duration < 60000, '不应该无限循环'); - }); -}); -``` - -#### 9.4 验证检查清单 - -| 验证项 | 验证方法 | 预期结果 | -|--------|---------|---------| -| 算力不出现负值 | 检查日志中是否有负值警告 | 如有警告,确认已自动修正为 0 | -| 任务完成后计数正确 | 发送 10 个任务,全部完成 | 最终 PQtasks = 0 | -| 算力降低时不崩溃 | 发送 30 个任务,降低算力到 20 | 系统稳定运行,无错误 | -| 无任务时不空转 | 监控无任务时的 CPU | 进入睡眠,不占用 CPU | -| 并发更新安全 | 100 次并发增减 | 最终计数正确,无负值 | - ---- - -## 二、message-dispatcher 项目修改指导 - -### 项目路径 -`d:\Ke_xue_web\独立项目\comfyui桥接器\message-dispatcher\` - ---- - -### 修改点 1:新增 WebSocket 客户端模块 - -**文件:** `src/md-websocket-client/index.js`(新建) - -**通信架构说明:** -- message-dispatcher 作为 WebSocket 客户端,主动连接任务队列后端 -- WebSocket 用于: - - JWT Token 主动推送与定期更新 - - 算力状态上报 - - 实例状态变化同步 - - 心跳保活 -- **仅任务提交通过 HTTP 接口完成** -- **所有其他通信(包括 Token 推送)均通过 WebSocket** - -**功能要求:** -1. 建立与任务队列后端的 WebSocket 连接 -2. 连接成功后立即推送当前 JWT Token -3. 定期推送 JWT Token 更新(如每 20 小时) -4. 定期推送算力状态(CAPACITY_UPDATE) -5. 实例状态变化时推送(INSTANCE_ONLINE/INSTANCE_OFFLINE 等) -6. 发送心跳保持连接 -7. 自动重连机制(指数退避) - -**核心方法:** -```javascript -class MDWebSocketClient { - constructor() { - this.ws = null; - this.connected = false; - this.reconnectAttempts = 0; - this.tokenPushInterval = null; - this.capacityPushInterval = null; - } - - // 初始化并连接 - async init() - - // 连接到任务队列后端 - async connect() - - // 断开连接 - disconnect() - - // 推送 JWT Token - pushJwtToken() - - // 推送算力状态 - pushCapacityState() - - // 推送实例上线 - pushInstanceOnline(instanceId) - - // 推送实例下线 - pushInstanceOffline(instanceId) - - // 发送消息 - send(message) - - // 处理接收到的消息 - handleMessage(message) -} -``` - -**WebSocket 消息格式(message-dispatcher 发送):** -```javascript -// JWT Token 更新推送 -{ - type: 'JWT_UPDATE', - data: { - token: 'eyJhbGciOiJIUzI1NiIs...', - expiresAt: '2024-01-02T00:00:00.000Z', - timestamp: '2024-01-01T00:00:00.000Z' - } -} - -// 算力状态更新推送 -{ - type: 'CAPACITY_UPDATE', - data: { - timestamp: '2024-01-01T00:00:00.000Z', - bridges: [...], - summary: { - totalBridges: 2, - totalInstances: 8, - onlineInstances: 6, - busyInstances: 2, - availableCapacity: 4 - } - } -} - -// 心跳 -{ - type: 'HEARTBEAT', - data: { - timestamp: '2024-01-01T00:00:00.000Z' - } -} -``` - ---- - -### 修改点 2:修改启动流程,集成 WebSocket 客户端 - -**文件:** `src/index.js` - -**修改内容:** -- 导入并初始化 MDWebSocketClient -- 在服务启动后启动 WebSocket 客户端 -- 在服务关闭时断开 WebSocket 连接 - ---- - -### 修改点 3:新增 runninghub 兼容接口(可选) - -**目标:** 确保接口兼容性,新增 runninghub 兼容的任务提交接口 - -**文件:** `src/api/index.js` - -**新增接口(可选):** -```javascript -// 兼容 runninghub 格式的任务提交接口 -router.post('/task/runninghub', authMiddleware, async (req, res) => { - // 请求体已经是 runninghub 格式,直接使用 - // 调用现有 /api/task 逻辑 -}); -``` - ---- - -### 修改点 4:确保回调兼容 - -**说明:** message-dispatcher 已支持 `webhookUrl` 参数,任务完成后会调用该回调。需要确保回调格式与 runninghub 保持一致。 - -**检查点:** -- 确认 `TASK_END` 消息处理中正确调用 webhookUrl -- 确保回调数据格式与 runninghub 兼容 - ---- - -### 修改点 5:新增任务处理与算力动态调整机制 - -**文件:** `src/task-scheduler/index.js`(新建) - -**核心需求:** -- 当任务队列后端发送 30 个任务,而系统算力突然降低至 20 时,实现任务保留机制 -- 将超出当前算力的任务存入缓存队列 -- 实时监控算力变化,有空闲算力时按 FIFO 取出任务处理 -- 管理缓存任务状态(等待中、处理中、已完成、失败重试等) - -**数据结构设计:** - -| 数据结构 | 类型 | 说明 | -|---------|------|------| -| `pendingTaskQueue` | Array | 待执行任务缓存队列(FIFO) | -| `processingTasks` | Map | 执行中任务 | -| `completedTasks` | List | 已完成任务(最近 1000 条) | -| `failedTasks` | List | 失败任务(最近 100 条) | - -**任务状态定义:** -```javascript -const TASK_STATES = { - PENDING: 'pending', // 等待中 - PROCESSING: 'processing', // 处理中 - COMPLETED: 'completed', // 已完成 - FAILED: 'failed', // 失败 - RETRYING: 'retrying' // 重试中 -}; -``` - -**核心类设计:** -```javascript -class TaskScheduler { - constructor() { - this.pendingTaskQueue = []; // FIFO 队列 - this.processingTasks = new Map(); // taskId -> taskInfo - this.currentCapacity = 0; // 当前可用算力 - this.maxCapacity = 0; // 最大算力 - this.schedulerLoop = null; - } - - // 初始化调度器 - async init() - - // 设置当前可用算力 - setCurrentCapacity(capacity) - - // 添加任务到缓存队列 - addTaskToPending(task) - - // 从缓存队列取出任务 - getTaskFromPending() - - // 将任务标记为处理中 - markTaskAsProcessing(taskId, instanceId) - - // 将任务标记为已完成 - markTaskAsCompleted(taskId, result) - - // 将任务标记为失败 - markTaskAsFailed(taskId, error) - - // 主调度循环 - async schedulerLoop() - - // 检查是否有空闲算力 - hasAvailableCapacity() - - // 获取可用任务数 - getAvailableSlots() - - // 处理算力降低 - handleCapacityReduction(newCapacity) - - // 处理算力增加 - handleCapacityIncrease(newCapacity) -} -``` - -**场景处理示例:** - -**场景 1:算力从 30 降低至 20** -```javascript -async handleCapacityReduction(newCapacity) { - const currentProcessingCount = this.processingTasks.size; - - // 如果处理中的任务数超过新容量 - if (currentProcessingCount > newCapacity) { - const excessCount = currentProcessingCount - newCapacity; - - // 获取最早开始的 excessCount 个任务 - const tasksToMoveBack = Array.from(this.processingTasks.values()) - .sort((a, b) => a.startTime - b.startTime) - .slice(0, excessCount); - - // 将任务移回 pending 队列头部(优先级高) - for (const task of tasksToMoveBack.reverse()) { - this.processingTasks.delete(task.taskId); - this.pendingTaskQueue.unshift({ - ...task, - state: TASK_STATES.PENDING, - movedBackAt: new Date().toISOString() - }); - } - - console.log(`[TaskScheduler] 算力降低: ${this.currentCapacity} -> ${newCapacity}, 已将 ${excessCount} 个任务移回缓存队列`); - } - - this.currentCapacity = newCapacity; -} -``` - -**场景 2:有空闲算力时调度任务** -```javascript -async schedulePendingTasks() { - const availableSlots = this.getAvailableSlots(); - - if (availableSlots <= 0 || this.pendingTaskQueue.length === 0) { - return; - } - - const tasksToSchedule = this.pendingTaskQueue.splice(0, availableSlots); - - for (const task of tasksToSchedule) { - // 分配任务到可用实例 - const instanceId = await this.selectAvailableInstance(); - - this.markTaskAsProcessing(task.taskId, instanceId); - - // 发送任务到实例 - await this.sendTaskToInstance(task, instanceId); - } - - console.log(`[TaskScheduler] 已调度 ${tasksToSchedule.length} 个任务`); -} -``` - -**算力更新监听:** -```javascript -// 监听来自任务队列后端的算力更新(通过 WebSocket) -handleCapacityUpdateFromBackend(data) { - const newCapacity = data.summary.availableCapacity; - - if (newCapacity < this.currentCapacity) { - this.handleCapacityReduction(newCapacity); - } else if (newCapacity > this.currentCapacity) { - this.handleCapacityIncrease(newCapacity); - } else { - this.currentCapacity = newCapacity; - } -} -``` - ---- - -### 修改点 6:集成任务调度器到主流程 - -**文件:** `src/index.js` - -**修改内容:** -- 导入并初始化 TaskScheduler -- 在 WebSocket 客户端收到算力更新时通知调度器 -- 在任务开始/完成时通知调度器更新状态 -- 在服务关闭时优雅关闭调度器 - ---- - -## 三、技术要求 - -### 3.1 代码规范 -- 遵循现有代码风格(ES Module, async/await) -- 保持与 runninghub.js 相同的接口签名 -- 请求体字段名称、类型、格式必须与 runninghub 完全一致 -- 仅修改 apiKey 字段的值为 JWT Token -- 添加充分的日志记录(使用 console.log/console.error) - -### 3.2 错误处理 -- 健康检查失败不应导致主进程崩溃 -- 降级机制必须可靠 -- 超时处理完善 -- JWT Token 更新失败不应中断服务 - -### 3.3 性能要求 -- 健康检查间隔不小于 10 秒 -- 决策时间不超过 100ms -- 不影响现有系统吞吐量 - ---- - -## 四、验收标准 - -### 4.1 功能验收 -- [ ] 任务分流逻辑正确:≤内部容量全部走内部,超出部分走外部 -- [ ] message-dispatcher 不可用时自动降级至 runninghub -- [ ] 请求体与 runninghub 完全一致(仅 apiKey 值不同) -- [ ] JWT Token 自动更新机制正常工作 -- [ ] 任务成功执行并返回正确结果 -- [ ] 回调接口正常工作 -- [ ] 现有功能不受影响(jimuai、coze 等平台正常工作) - -### 4.2 接口兼容性验收 -- [ ] messageDispatcher.js 接口方法签名与 runninghub.js 完全一致 -- [ ] 请求体字段名称、类型、格式与 runninghub 完全一致 -- [ ] 响应格式与 runninghub 兼容 -- [ ] 不修改 runninghub.js 的任何代码 - -### 4.3 容量边界验收 -- [ ] WebSocket 服务正常启动,message-dispatcher 成功连接 -- [ ] JWT Token 成功通过 WebSocket 接收 -- [ ] JWT Token 定期更新通过 WebSocket 接收 -- [ ] 算力状态实时同步(通过 WebSocket) -- [ ] 实例状态变化实时同步(通过 WebSocket) -- [ ] 场景 1:25个任务 → 全部25个走内部(实时容量) -- [ ] 场景 2:30个任务 → 全部30个走内部(实时容量) -- [ ] 场景 3:35个任务 → 30个内部,5个外部(实时容量) -- [ ] 场景 4:40个任务 → 30个内部,10个外部(实时容量) -- [ ] 场景 5:45个任务 → 30个内部,10个外部,5个等待(实时容量) - -### 4.4 日志验收 -- [ ] 记录每次分发决策(使用内部/外部) -- [ ] 记录容量使用统计 -- [ ] 记录降级事件及原因 -- [ ] 记录 JWT Token 接收/更新日志(通过 WebSocket) -- [ ] 记录 WebSocket 连接/断开日志 - -### 4.5 可靠性验收 -- [ ] message-dispatcher 重启后自动恢复 -- [ ] 网络波动不影响降级逻辑 -- [ ] JWT Token 自动更新不中断服务 -- [ ] 连续运行 24 小时无崩溃 - -### 4.6 算力管理验收 -- [ ] 算力计数从不出现负值 -- [ ] 任务完成后算力计数正确增加 -- [ ] 算力从 30 降低到 20 时系统稳定 -- [ ] 无任务时进入睡眠,不空转 -- [ ] 100 次并发更新后计数正确 -- [ ] 单元测试覆盖正常、边界、异常情况 - ---- - -## 五、关键文件参考 - -### 任务队列后端 -- `outside/outPlatforms/runninghub.js` - 参考接口设计 -- `outside/generat.js` - 修改任务分发逻辑 -- `outside/outPlatforms/outside.js` - 平台注册 -- `worker_threads/wait/waiting.js` - 批量任务分流 -- `worker_threads/wait/generatTask.js` - 任务处理流程 - -### message-dispatcher -- `src/api/index.js` - API 接口定义 -- `src/bridge-manager/index.js` - 桥接器管理 -- `src/websocket-server/index.js` - 任务发送逻辑 - ---- - -## 六、实现顺序建议 - -### 任务队列后端实现顺序 -1. **第一步:** 创建 `mdWebSocketServer.js` WebSocket 服务模块 -2. **第二步:** 创建 `taskDistributor.js` 任务分流模块 -3. **第三步:** 创建 `messageDispatcher.js` 平台适配器 -4. **第四步:** 修改 `waiting.js` 实现批量任务分流 -5. **第五步:** 修改 `generat.js` 实现单任务分发决策 -6. **第六步:** 更新 `outside.js` 和配置文件 -7. **第七步:** 检查并修复 `redis/initQueue.js` 算力计数(添加边界检查) -8. **第八步:** 创建 `capacityGuard.js` 算力更新锁 -9. **第九步:** 修改 `waiting.js` 添加空转防御 -10. **第十步:** 创建单元测试 `test/capacity.test.js` -11. **第十一步:** 测试验证功能 - -### message-dispatcher 实现顺序 -1. **第一步:** 创建 `md-websocket-client/index.js` WebSocket 客户端模块 -2. **第二步:** 修改 `src/index.js` 集成 WebSocket 客户端 -3. **第三步:** 创建 `task-scheduler/index.js` 任务调度器 -4. **第四步:** 修改 `src/index.js` 集成任务调度器 -5. **第五步:** 添加 runninghub 兼容接口(可选) -6. **第六步:** 确保回调兼容 -7. **第七步:** 测试验证功能 - ---- - -## 七、注意事项 - -⚠️ **重要提醒:** -1. **不要修改** `runninghub.js` 的现有代码 -2. **保持** `externalPostRequest()` 的返回值格式不变 -3. **确保** 请求体与 runninghub 完全一致(仅 apiKey 值不同) -4. **确保** 回调接口格式与 runninghub 完全一致 -5. **不要** 破坏现有其他平台(jimuai、coze)的功能 -6. **移除** 所有高低优先级任务的区分逻辑 -7. **新增** 日志时使用清晰的前缀,如 `[MessageDispatcher]`、`[JWTManager]`、`[TaskDistributor]` - ---- - -## 八、验证方法 - -### 8.1 接口兼容性验证 - -**验证步骤:** -1. 对比 messageDispatcher.js 与 runninghub.js 的方法签名 -2. 验证 getGenerateBody() 返回的请求体字段名称 -3. 验证 getGenerateBody() 返回的请求体字段类型 -4. 确认仅 apiKey 字段的值被修改 - -**验证代码:** -```javascript -// 对比两个平台适配器的接口 -const runninghubMethods = Object.keys(runninghub); -const messageDispatcherMethods = Object.keys(messageDispatcher); -assert.deepEqual(runninghubMethods, messageDispatcherMethods, '接口方法必须一致'); -``` - -### 8.2 任务分流验证 - -**验证场景:** -1. 构造不同数量的待分发任务 -2. 检查内部/外部任务分配比例 -3. 验证不超过各自容量上限 - ---- - -现在,请按照以上指导开始实现! diff --git a/backend/.env b/backend/.env index 8e496a3..867cac9 100644 --- a/backend/.env +++ b/backend/.env @@ -6,5 +6,11 @@ JWT_SECRET=comfyui-cluster-bridge-secret-key-2024 JWT_EXPIRES_IN=24h ADMIN_USERNAME=admin ADMIN_PASSWORD=2233..2233 -MESSAGE_DISPATCHER_URL=ws://localhost:4000/ws + +MESSAGE_DISPATCHER_URL=wss://www.whjbjm.com/message-dispatcher +INTERNAL_UPLOAD_URL=http://43.134.182.189:9000/api/internal/uploadGeneratedFile +INTERNAL_API_TOKEN=123456/message-dispatcher BRIDGE_ID=bridge-1 +WORKFLOW_RESOURCES_URL=http://117.72.204.159/AIGC/static/public/workflows + +COMFYUI_OUTPUT_DIR=/root/ComfyUI/output \ No newline at end of file diff --git a/backend/config/servers.json b/backend/config/servers.json index c8519c7..c8681e0 100644 --- a/backend/config/servers.json +++ b/backend/config/servers.json @@ -24,6 +24,10 @@ "taskQueue": { "websocketUrl": "ws://localhost:8080/ws" }, + "messageDispatcher": { + "websocketUrl": "wss://www.whjbjm.com/message-dispatcher", + "bridgeId": "bridge-1" + }, "upload": { "url": "https://shuzhiren.xueai.art/upload/file" } diff --git a/backend/src/config/index.js b/backend/src/config/index.js index 9bf5ecf..564f512 100644 --- a/backend/src/config/index.js +++ b/backend/src/config/index.js @@ -56,7 +56,7 @@ class ConfigManager { timeout: 3000 }, messageDispatcher: { - websocketUrl: process.env.MESSAGE_DISPATCHER_URL || 'ws://localhost:4000/ws', + websocketUrl: process.env.MESSAGE_DISPATCHER_URL || 'wss://www.whjbjm.com/message-dispatcher', bridgeId: process.env.BRIDGE_ID || 'bridge-1' }, upload: { @@ -72,6 +72,13 @@ class ConfigManager { * @returns {*} 配置值 */ get(key, defaultValue = null) { + if (key === 'messageDispatcher.websocketUrl') { + return process.env.MESSAGE_DISPATCHER_URL || defaultValue; + } + if (key === 'messageDispatcher.bridgeId') { + return process.env.BRIDGE_ID || defaultValue; + } + const keys = key.split('.'); let value = this.config; for (const k of keys) { diff --git a/backend/src/file-uploader/index.js b/backend/src/file-uploader/index.js index caa1582..1ee245f 100644 --- a/backend/src/file-uploader/index.js +++ b/backend/src/file-uploader/index.js @@ -1,148 +1,61 @@ + /** * file-uploader模块 - 文件上传处理 */ -import multer from 'multer'; -import { v4 as uuidv4 } from 'uuid'; -import path from 'path'; -import fs from 'fs'; -import logger from '../logger/index.js'; -import config from '../config/index.js'; import axios from 'axios'; import FormData from 'form-data'; +import fs from 'fs'; +import path from 'path'; +import logger from '../logger/index.js'; const uploadDir = path.resolve(process.cwd(), 'uploads'); +const INTERNAL_UPLOAD_URL = process.env.INTERNAL_UPLOAD_URL || 'http://43.134.182.189:9000/api/internal/uploadGeneratedFile'; +const INTERNAL_API_TOKEN = process.env.INTERNAL_API_TOKEN || ''; + if (!fs.existsSync(uploadDir)) { fs.mkdirSync(uploadDir, { recursive: true }); } -const storage = multer.diskStorage({ - destination: (req, file, cb) => { - cb(null, uploadDir); - }, - filename: (req, file, cb) => { - const ext = path.extname(file.originalname); - cb(null, `${uuidv4()}${ext}`); - } -}); - -const upload = multer({ - storage, - limits: { - fileSize: 100 * 1024 * 1024 - } -}); - class FileUploader { - constructor() { - this.files = new Map(); - } - - /** - * 获取multer上传中间件 - */ - getUploadMiddleware() { - return upload.single('file'); - } - - /** - * 处理文件上传 - * @param {object} file - 文件对象 - * @returns {object} 文件信息 - */ - async uploadFile(file) { - const fileId = uuidv4(); - const fileInfo = { - id: fileId, - filename: file.originalname, - path: file.path, - size: file.size, - mimetype: file.mimetype, - uploadedAt: new Date().toISOString() - }; - - this.files.set(fileId, fileInfo); - logger.info(`文件已上传: ${fileId} - ${file.originalname}`); - return fileInfo; - } - - /** - * 上传文件到外部服务器 - * @param {string} filePath - 文件路径 - * @param {string} originalName - 原始文件名 - * @returns {object} 上传结果 - */ - async uploadToExternalServer(filePath, originalName) { - const uploadUrl = config.get('upload.url', 'https://shuzhiren.xueai.art/upload/file'); - + async uploadToExternalServer(filePath, originalFilename) { const formData = new FormData(); - formData.append('file', fs.createReadStream(filePath), { - filename: originalName - }); - - const response = await axios.post(uploadUrl, formData, { - headers: formData.getHeaders(), - maxContentLength: Infinity, - maxBodyLength: Infinity - }); - - logger.info(`文件已上传到外部服务器: ${originalName}`); - return response.data; - } - - /** - * 获取文件信息 - * @param {string} fileId - 文件ID - * @returns {object|null} 文件信息 - */ - getFile(fileId) { - return this.files.get(fileId) || null; - } - - /** - * 获取文件列表 - * @returns {Array} 文件列表 - */ - getFiles() { - return Array.from(this.files.values()); - } - - /** - * 删除文件 - * @param {string} fileId - 文件ID - * @returns {boolean} 是否成功 - */ - deleteFile(fileId) { - const fileInfo = this.files.get(fileId); - if (!fileInfo) { - return false; + formData.append('file', fs.createReadStream(filePath), originalFilename); + + const headers = { + 'Content-Type': `multipart/form-data; boundary=${formData.getBoundary()}` + }; + + if (INTERNAL_API_TOKEN) { + headers['Authorization'] = `Bearer ${INTERNAL_API_TOKEN}`; } - - if (fs.existsSync(fileInfo.path)) { - fs.unlinkSync(fileInfo.path); - } - - this.files.delete(fileId); - logger.info(`文件已删除: ${fileId}`); - return true; - } - - /** - * 清理过期文件 - * @param {number} maxAgeHours - 最大保留时间(小时) - */ - cleanupOldFiles(maxAgeHours = 24) { - const now = Date.now(); - const maxAge = maxAgeHours * 60 * 60 * 1000; - - for (const [fileId, fileInfo] of this.files) { - const age = now - new Date(fileInfo.uploadedAt).getTime(); - if (age > maxAge) { - this.deleteFile(fileId); + + try { + logger.info(`正在上传文件到外部服务器: ${INTERNAL_UPLOAD_URL}, 文件名: ${originalFilename}`); + + const response = await axios.post(INTERNAL_UPLOAD_URL, formData, { + headers, + timeout: 60000 + }); + + if (response.data && response.data.code === '0' && response.data.data && response.data.data.url) { + logger.info(`文件上传成功: ${response.data.data.url}`); + return response.data.data; + } else { + logger.error(`文件上传失败: ${JSON.stringify(response.data)}`); + throw new Error(response.data?.msg || '文件上传失败'); + } + } catch (error) { + logger.error('文件上传出错:', error.message); + throw error; + } finally { + if (fs.existsSync(filePath)) { + fs.unlinkSync(filePath); } } } } export default new FileUploader(); + diff --git a/backend/src/task-forwarder/index.js b/backend/src/task-forwarder/index.js index 3ad8226..30a2a53 100644 --- a/backend/src/task-forwarder/index.js +++ b/backend/src/task-forwarder/index.js @@ -1,3 +1,13 @@ +/** + * TaskForwarder - 任务转发器 + * + * 设计说明: + * - clientId 使用实例 ID(固定不变),实现 WebSocket 连接复用 + * - prompt_id 使用 taskId,便于任务追踪和查询 + * - 同一实例的所有任务共享同一个 WebSocket 连接 + * - 通过 prompt_id 区分不同任务的消息 + */ + import { v4 as uuidv4 } from 'uuid'; import logger from '../logger/index.js'; import clusterManager from '../cluster-manager/index.js'; @@ -16,7 +26,10 @@ class TaskForwarder { } setupEventListeners() { + logger.info('[TaskForwarder] 设置事件监听器'); + webSocketClient.on('execution_start', ({ instanceId, promptId }) => { + logger.info(`[TaskForwarder] 收到 execution_start 事件: instanceId=${instanceId}, promptId=${promptId}`); this.handleExecutionStart(instanceId, promptId).catch(err => { logger.error('处理 execution_start 事件失败:', err); }); @@ -29,12 +42,21 @@ class TaskForwarder { }); webSocketClient.on('executed', ({ instanceId, data }) => { + logger.info(`[TaskForwarder] 收到 executed 事件: instanceId=${instanceId}, promptId=${data?.prompt_id}`); this.handleExecuted(instanceId, data).catch(err => { logger.error('处理 executed 事件失败:', err); }); }); + webSocketClient.on('execution_success', ({ instanceId, data }) => { + logger.info(`[TaskForwarder] 收到 execution_success 事件: instanceId=${instanceId}, promptId=${data?.prompt_id}`); + this.handleExecutionSuccess(instanceId, data).catch(err => { + logger.error('处理 execution_success 事件失败:', err); + }); + }); + webSocketClient.on('execution_error', ({ instanceId, data }) => { + logger.error(`[TaskForwarder] 收到 execution_error 事件: instanceId=${instanceId}, promptId=${data?.prompt_id}`); this.handleExecutionError(instanceId, data).catch(err => { logger.error('处理 execution_error 事件失败:', err); }); @@ -59,7 +81,7 @@ class TaskForwarder { const task = { id: taskId, - promptId: null, + promptId: taskId, workflow, nodeInfoList, workflowId, @@ -96,115 +118,276 @@ class TaskForwarder { } async sendTaskToInstance(task, instance) { - await webSocketClient.connect(instance.id, instance.wsUrl); + logger.info(`[TaskForwarder] 准备发送任务 ${task.id} 到实例 ${instance.id}`); + logger.info(`[TaskForwarder] 实例信息: ${JSON.stringify({ id: instance.id, wsUrl: instance.wsUrl, apiUrl: instance.apiUrl })}`); - const promptMessage = { + const wsClientId = instance.id; + const wsUrlWithClientId = `${instance.wsUrl}?clientId=${wsClientId}`; + logger.info(`[TaskForwarder] WebSocket URL (clientId=实例ID): ${wsUrlWithClientId}`); + + await webSocketClient.connect(instance.id, wsUrlWithClientId); + + const promptPayload = { prompt: task.workflow, - client_id: task.id + prompt_id: task.id, + client_id: wsClientId, + front_end: "comfy" }; - webSocketClient.send(instance.id, promptMessage); + logger.info(`[TaskForwarder] 发送的 prompt 消息结构: prompt_id=${task.id}, client_id=${wsClientId}, workflow节点数=${Object.keys(task.workflow || {}).length}`); + logger.info(`[TaskForwarder] 通过 HTTP POST /prompt 提交任务到 ${instance.apiUrl}/prompt`); + + try { + const response = await axios.post(`${instance.apiUrl}/prompt`, promptPayload, { + headers: { + 'Content-Type': 'application/json' + }, + timeout: 30000 + }); + + logger.info(`[TaskForwarder] HTTP POST /prompt 响应: ${JSON.stringify(response.data)}`); + + if (response.data?.node_errors && Object.keys(response.data.node_errors).length > 0) { + const nodeErrors = response.data.node_errors; + const errorMessages = []; + + for (const [nodeId, errorInfo] of Object.entries(nodeErrors)) { + if (errorInfo.errors && errorInfo.errors.length > 0) { + for (const err of errorInfo.errors) { + errorMessages.push(`节点 ${nodeId}: ${err.message}${err.details ? ` (${err.details})` : ''}`); + } + } + } + + const fullErrorMessage = `工作流节点错误: ${errorMessages.join('; ')}`; + logger.error(`[TaskForwarder] ${fullErrorMessage}`); + throw new Error(fullErrorMessage); + } + + const returnedPromptId = response.data?.prompt_id || response.data?.promptId; + logger.info(`[TaskForwarder] 任务 ${task.id} 已提交,ComfyUI 返回 prompt_id: ${returnedPromptId}`); + + } catch (error) { + let errorMessage = error.message; + + if (error.response && error.response.data) { + const comfyError = error.response.data; + logger.error(`[TaskForwarder] 错误响应: ${JSON.stringify(comfyError)}`); + + if (comfyError.error && comfyError.error.message) { + errorMessage = comfyError.error.message; + } else if (comfyError.message) { + errorMessage = comfyError.message; + } else if (typeof comfyError === 'string') { + errorMessage = comfyError; + } else { + errorMessage = JSON.stringify(comfyError); + } + logger.error(`[TaskForwarder] 提取的错误信息: ${errorMessage}`); + } + + throw new Error(errorMessage); + } + task.status = 'submitted'; logger.info(`任务 ${task.id} 已发送到实例 ${instance.id}`); } async handleExecutionStart(instanceId, promptId) { - for (const [taskId, task] of this.tasks) { - if (task.instanceId === instanceId && !task.promptId && task.status === 'submitted') { - task.promptId = promptId; - task.status = 'running'; - task.startedAt = new Date().toISOString(); - this.tasks.set(taskId, task); - clusterManager.updateInstanceStatus(instanceId, 'busy'); - logger.info(`任务 ${task.id} 开始执行, promptId: ${promptId}`); - break; - } + logger.info(`[TaskForwarder] handleExecutionStart: instanceId=${instanceId}, promptId=${promptId}`); + + const task = this.tasks.get(promptId); + if (!task) { + logger.warn(`[TaskForwarder] 未找到任务: promptId=${promptId}`); + return; } + + if (task.instanceId !== instanceId) { + logger.warn(`[TaskForwarder] 任务实例不匹配: taskId=${promptId}, task.instanceId=${task.instanceId}, event.instanceId=${instanceId}`); + return; + } + + if (task.status !== 'submitted') { + logger.warn(`[TaskForwarder] 任务状态不正确: taskId=${promptId}, status=${task.status}`); + return; + } + + task.status = 'running'; + task.startedAt = new Date().toISOString(); + this.tasks.set(promptId, task); + clusterManager.updateInstanceStatus(instanceId, 'busy'); + logger.info(`任务 ${task.id} 开始执行`); } async handleProgress(instanceId, data) { - for (const [taskId, task] of this.tasks) { - if (task.instanceId === instanceId && task.status === 'running') { - if (data.max && data.max > 0) { - task.progress = Math.round((data.value / data.max) * 100); - this.tasks.set(taskId, task); - } - break; - } + if (!data?.prompt_id) { + return; + } + + const task = this.tasks.get(data.prompt_id); + if (!task || task.instanceId !== instanceId) { + return; + } + + if (data.max && data.max > 0) { + task.progress = Math.round((data.value / data.max) * 100); + this.tasks.set(data.prompt_id, task); + logger.info(`[TaskForwarder] 任务 ${data.prompt_id} 进度: ${task.progress}%`); } } async handleExecuted(instanceId, data) { - for (const [taskId, task] of this.tasks) { - if (task.promptId === data.prompt_id && task.status === 'running') { - task.status = 'completed'; - task.completedAt = new Date().toISOString(); - task.result = data; - this.tasks.set(taskId, task); - clusterManager.updateInstanceStatus(instanceId, 'online'); - logger.info(`任务 ${task.id} 执行完成`); - - if (task.webhookUrl) { - await this.sendWebhookCallback(task, data, null); - } - - if (task.queueTaskId) { - const resultData = await this.processResultData(data, instanceId); - taskQueueClient.notifyTaskComplete(task.queueTaskId, resultData); - } - - break; - } + logger.info(`[TaskForwarder] handleExecuted: instanceId=${instanceId}, promptId=${data?.prompt_id}, node=${data?.node}`); + + if (!data?.output) { + logger.info(`[TaskForwarder] executed 消息无输出,跳过处理`); + return; } + + const promptId = data.prompt_id; + const task = this.tasks.get(promptId); + if (!task || task.instanceId !== instanceId) { + return; + } + + if (!task.partialResults) { + task.partialResults = []; + } + task.partialResults.push(data); + logger.info(`[TaskForwarder] 收集节点 ${data.node} 的输出结果`); } async handleExecutionError(instanceId, data) { - for (const [taskId, task] of this.tasks) { - if (task.promptId === data.prompt_id && task.status === 'running') { - task.status = 'failed'; + logger.error(`[TaskForwarder] handleExecutionError: instanceId=${instanceId}, promptId=${data?.prompt_id}`); + + const promptId = data.prompt_id; + const task = this.tasks.get(promptId); + if (!task || task.instanceId !== instanceId) { + return; + } + + task.status = 'failed'; + task.completedAt = new Date().toISOString(); + task.error = data.exception_message || data.error || JSON.stringify(data); + this.tasks.set(promptId, task); + clusterManager.updateInstanceStatus(instanceId, 'online'); + logger.error(`任务 ${task.id} 执行失败: ${task.error}`); + + if (task.webhookUrl) { + await this.sendWebhookCallback(task, null, task.error); + } + + if (task.queueTaskId) { + taskQueueClient.notifyTaskComplete(task.queueTaskId, null, task.error); + } + } + + async handleExecutionSuccess(instanceId, data) { + logger.info(`[TaskForwarder] handleExecutionSuccess: instanceId=${instanceId}, promptId=${data?.prompt_id}`); + + const promptId = data.prompt_id; + const task = this.tasks.get(promptId); + if (!task) { + logger.warn(`[TaskForwarder] 未找到任务: promptId=${promptId}`); + return; + } + + if (task.instanceId !== instanceId) { + logger.warn(`[TaskForwarder] 任务实例不匹配: taskId=${promptId}`); + return; + } + + logger.info(`[TaskForwarder] 找到匹配的任务: ${promptId}, 准备获取结果`); + + const instance = clusterManager.getInstance(instanceId); + if (!instance) { + logger.error(`[TaskForwarder] 无法找到实例: ${instanceId}`); + return; + } + + try { + const historyResponse = await axios.get(`${instance.apiUrl}/history/${promptId}`, { + timeout: 10000 + }); + + const historyData = historyResponse.data; + logger.info(`[TaskForwarder] 获取到历史记录`); + + let outputs = null; + if (historyData && historyData[promptId] && historyData[promptId].outputs) { + outputs = historyData[promptId].outputs; + } else if (historyData && historyData.outputs) { + outputs = historyData.outputs; + } + + if (outputs) { + const resultData = await this.processHistoryOutputs(outputs, instanceId); + + task.status = 'completed'; task.completedAt = new Date().toISOString(); - task.error = data.exception_message; - this.tasks.set(taskId, task); + task.result = outputs; + this.tasks.set(promptId, task); clusterManager.updateInstanceStatus(instanceId, 'online'); - logger.error(`任务 ${task.id} 执行失败: ${data.exception_message}`); + logger.info(`任务 ${task.id} 执行完成,结果数量: ${resultData.length}`); if (task.webhookUrl) { - await this.sendWebhookCallback(task, null, data.exception_message); + await this.sendWebhookCallback(task, { output: outputs }, null); } if (task.queueTaskId) { - taskQueueClient.notifyTaskComplete(task.queueTaskId, null, data.exception_message); + taskQueueClient.notifyTaskComplete(task.queueTaskId, resultData); } - - break; + } else { + logger.warn(`[TaskForwarder] 历史记录中没有 outputs`); + } + } catch (error) { + logger.error(`[TaskForwarder] 获取历史记录失败: ${error.message}`); + task.status = 'failed'; + task.error = `获取结果失败: ${error.message}`; + this.tasks.set(promptId, task); + + if (task.webhookUrl) { + await this.sendWebhookCallback(task, null, task.error); + } + + if (task.queueTaskId) { + taskQueueClient.notifyTaskComplete(task.queueTaskId, null, task.error); } } } - async processResultData(data, instanceId) { + async processHistoryOutputs(outputs, instanceId) { const resultData = []; - if (data.output) { - const instance = clusterManager.getInstance(instanceId); - if (!instance) { - return resultData; - } + if (!outputs) { + return resultData; + } + + const instance = clusterManager.getInstance(instanceId); + if (!instance) { + logger.error(`[processHistoryOutputs] 无法找到实例: ${instanceId}`); + return resultData; + } - for (const [nodeId, output] of Object.entries(data.output)) { - if (output.images) { - for (const image of output.images) { - try { - const fileUrl = await this.uploadImage(image, instance); - resultData.push({ - fileUrl, - fileType: image.type || 'png', - taskCostTime: 0, - nodeId - }); - } catch (error) { - logger.error('上传图片失败:', error); - } - } + for (const [nodeId, output] of Object.entries(outputs)) { + if (!output) continue; + + const mediaFiles = output.images || output.gifs || []; + logger.info(`[processHistoryOutputs] 节点 ${nodeId} 找到 ${mediaFiles.length} 个媒体文件`); + + for (const media of mediaFiles) { + try { + logger.info(`[processHistoryOutputs] 正在上传文件: ${media.filename}`); + const fileUrl = await this.uploadImage(media, instance); + logger.info(`[processHistoryOutputs] 文件上传成功: ${fileUrl}`); + resultData.push({ + fileUrl, + fileType: media.type || 'png', + taskCostTime: 0, + nodeId + }); + } catch (error) { + logger.error('[processHistoryOutputs] 上传文件失败:', error); } } } @@ -213,6 +396,23 @@ class TaskForwarder { } async uploadImage(image, instance) { + const comfyuiOutputDir = process.env.COMFYUI_OUTPUT_DIR; + + if (comfyuiOutputDir) { + let localFilePath = path.join(comfyuiOutputDir, image.filename); + if (image.subfolder) { + localFilePath = path.join(comfyuiOutputDir, image.subfolder, image.filename); + } + + if (fs.existsSync(localFilePath)) { + logger.info(`从本地目录读取文件: ${localFilePath}`); + const uploadResult = await fileUploader.uploadToExternalServer(localFilePath, image.filename); + return uploadResult.url || uploadResult.data?.url; + } else { + logger.warn(`本地文件不存在: ${localFilePath}, 回退到 HTTP API`); + } + } + const imageUrl = `${instance.apiUrl}/view?filename=${image.filename}&subfolder=${image.subfolder || ''}&type=${image.type}`; const response = await axios.get(imageUrl, { @@ -247,7 +447,7 @@ class TaskForwarder { data: [] }); } else { - const processedData = await this.processResultData(resultData, task.instanceId); + const processedData = await this.processHistoryOutputs(resultData?.output, task.instanceId); eventData = JSON.stringify({ code: 0, msg: 'success', @@ -311,6 +511,25 @@ class TaskForwarder { logger.info(`任务 ${taskId} 已取消`); return true; } + + async getTaskStatus(taskId) { + const task = this.tasks.get(taskId); + if (!task) { + return null; + } + + return { + id: task.id, + promptId: task.promptId, + status: task.status, + progress: task.progress, + instanceId: task.instanceId, + createdAt: task.createdAt, + startedAt: task.startedAt, + completedAt: task.completedAt, + error: task.error + }; + } } export default new TaskForwarder(); diff --git a/backend/src/task-queue-client/index.js b/backend/src/task-queue-client/index.js index f037a82..0873705 100644 --- a/backend/src/task-queue-client/index.js +++ b/backend/src/task-queue-client/index.js @@ -3,6 +3,8 @@ import logger from '../logger/index.js'; import config from '../config/index.js'; import clusterManager from '../cluster-manager/index.js'; import taskForwarder from '../task-forwarder/index.js'; +import comfyUIMonitor from '../comfyui-monitor/index.js'; +import workflowConverter from '../workflow-converter/index.js'; import { v4 as uuidv4 } from 'uuid'; import axios from 'axios'; import EventEmitter from 'events'; @@ -23,6 +25,12 @@ class MessageDispatcherClient extends EventEmitter { this.handleConfigChange(); }); this.startHeartbeatInterval(); + + comfyUIMonitor.on('connectionStateChange', () => { + if (this.isConnected) { + this.sendRegisterMessage(); + } + }); } startHeartbeatInterval() { @@ -32,7 +40,7 @@ class MessageDispatcherClient extends EventEmitter { } getWebSocketUrl() { - return config.get('messageDispatcher.websocketUrl', 'ws://localhost:4000/ws'); + return config.get('messageDispatcher.websocketUrl', 'wss://www.whjbjm.com/message-dispatcher'); } connect() { @@ -165,6 +173,7 @@ class MessageDispatcherClient extends EventEmitter { async handleMessage(data) { try { const message = JSON.parse(data.toString()); + console.log('[后端] 收到消息:', JSON.stringify(message, null, 2)); logger.debug('[MessageDispatcher] 收到消息:', message.type); switch (message.type) { @@ -236,15 +245,16 @@ class MessageDispatcherClient extends EventEmitter { } async handleTaskAssign(taskData) { - const { workflowId, nodeInfoList, webhookUrl, requestId } = taskData; - const taskId = uuidv4(); + const { workflowId, nodeInfoList, webhookUrl, requestId, instanceId } = taskData; + const taskId = requestId; - logger.info(`[MessageDispatcher] 收到任务: ${workflowId}, 生成taskId: ${taskId}`); + logger.info(`[MessageDispatcher] 收到任务: ${workflowId}, 使用requestId作为taskId: ${taskId}, 指定实例: ${instanceId}`); const ackResponse = { type: 'TASK_ACK', data: { requestId, + instanceId, code: 0, msg: 'success', data: { @@ -267,10 +277,15 @@ class MessageDispatcherClient extends EventEmitter { this.pendingTasks.set(taskId, taskRecord); try { + const workflow = await workflowConverter.convert(workflowId, nodeInfoList); + const actualTaskId = await taskForwarder.submitTask( - {}, + workflow, nodeInfoList, - workflowId + workflowId, + instanceId, + webhookUrl, + taskId ); taskRecord.status = 'running'; diff --git a/backend/src/websocket-client/index.js b/backend/src/websocket-client/index.js index bff131c..e0a0ab7 100644 --- a/backend/src/websocket-client/index.js +++ b/backend/src/websocket-client/index.js @@ -1,5 +1,10 @@ /** * websocket-client模块 - 与ComfyUI实例的WebSocket通信 + * + * 设计说明: + * - clientId 使用实例 ID(固定不变),实现连接复用 + * - 同一实例的所有任务共享同一个 WebSocket 连接 + * - 通过 prompt_id 区分不同任务的消息 */ import WebSocket from 'ws'; @@ -13,29 +18,30 @@ class WebSocketClient extends EventEmitter { this.connections = new Map(); } - /** - * 连接到指定实例 - * @param {string} instanceId - 实例ID - * @param {string} wsUrl - WebSocket地址 - * @returns {Promise} WebSocket连接 - */ connect(instanceId, wsUrl) { return new Promise((resolve, reject) => { - if (this.connections.has(instanceId)) { - const conn = this.connections.get(instanceId); - if (conn.readyState === WebSocket.OPEN) { - resolve(conn); - return; - } + const existingConn = this.connections.get(instanceId); + + if (existingConn && existingConn.ws && existingConn.ws.readyState === WebSocket.OPEN) { + logger.info(`[WebSocketClient] 实例 ${instanceId} 已有连接,直接复用`); + resolve(existingConn.ws); + return; + } + + if (existingConn && existingConn.ws) { + logger.info(`[WebSocketClient] 关闭旧连接,重新连接`); + existingConn.ws.close(); + this.connections.delete(instanceId); } logger.info(`正在连接到实例 ${instanceId}: ${wsUrl}`); + logger.info(`[WebSocketClient] 连接详情: instanceId=${instanceId}, wsUrl=${wsUrl}`); const ws = new WebSocket(wsUrl); ws.on('open', () => { logger.info(`成功连接到实例 ${instanceId}`); - this.connections.set(instanceId, ws); + this.connections.set(instanceId, { ws, wsUrl }); const stateChange = comfyUIMonitor.setInstanceState(instanceId, 'connected'); if (stateChange) { @@ -92,79 +98,82 @@ class WebSocketClient extends EventEmitter { }); } - /** - * 处理收到的消息 - * @param {string} instanceId - 实例ID - * @param {object} message - 消息对象 - */ handleMessage(instanceId, message) { + if (message.type !== 'progress_state' && message.type !== 'progress') { + logger.info(`[WebSocketClient] 收到消息 from ${instanceId}: type=${message.type}, data=${JSON.stringify(message.data || {}).substring(0, 200)}`); + } this.emit('message', { instanceId, message }); switch (message.type) { case 'status': + logger.info(`[WebSocketClient] status 消息: ${JSON.stringify(message.data)}`); this.emit('status', { instanceId, status: message.data }); break; case 'progress': - this.emit('progress', { instanceId, data: message.data }); break; case 'execution_start': + logger.info(`[WebSocketClient] execution_start 消息: prompt_id=${message.data?.prompt_id}`); this.emit('execution_start', { instanceId, promptId: message.data.prompt_id }); break; case 'execution_cached': + logger.info(`[WebSocketClient] execution_cached 消息: ${JSON.stringify(message.data)}`); this.emit('execution_cached', { instanceId, data: message.data }); break; + case 'executing': + logger.info(`[WebSocketClient] executing 消息: node=${message.data?.node}, prompt_id=${message.data?.prompt_id}`); + this.emit('executing', { instanceId, data: message.data }); + break; case 'executed': + logger.info(`[WebSocketClient] executed 消息: prompt_id=${message.data?.prompt_id}`); this.emit('executed', { instanceId, data: message.data }); break; + case 'execution_success': + logger.info(`[WebSocketClient] execution_success 消息: prompt_id=${message.data?.prompt_id}`); + this.emit('execution_success', { instanceId, data: message.data }); + break; case 'execution_error': + logger.error(`[WebSocketClient] execution_error 消息: ${JSON.stringify(message.data)}`); this.emit('execution_error', { instanceId, data: message.data }); break; + case 'progress_state': + break; + default: + logger.info(`[WebSocketClient] 未处理的消息类型: ${message.type}`); } } - /** - * 发送消息到指定实例 - * @param {string} instanceId - 实例ID - * @param {object} message - 消息对象 - */ send(instanceId, message) { - const ws = this.connections.get(instanceId); + const conn = this.connections.get(instanceId); + const ws = conn?.ws; if (!ws || ws.readyState !== WebSocket.OPEN) { + logger.error(`[WebSocketClient] 实例 ${instanceId} 未连接,无法发送消息`); throw new Error(`实例 ${instanceId} 未连接`); } - ws.send(JSON.stringify(message)); + const messageStr = JSON.stringify(message); + logger.info(`[WebSocketClient] 发送消息到实例 ${instanceId}: ${messageStr.substring(0, 500)}${messageStr.length > 500 ? '...' : ''}`); + ws.send(messageStr); } - /** - * 断开指定实例的连接 - * @param {string} instanceId - 实例ID - */ disconnect(instanceId) { - const ws = this.connections.get(instanceId); - if (ws) { - ws.close(); + const conn = this.connections.get(instanceId); + if (conn && conn.ws) { + conn.ws.close(); this.connections.delete(instanceId); } } - /** - * 断开所有连接 - */ disconnectAll() { - for (const [instanceId, ws] of this.connections) { - ws.close(); + for (const [instanceId, conn] of this.connections) { + if (conn.ws) { + conn.ws.close(); + } } this.connections.clear(); } - /** - * 检查实例是否已连接 - * @param {string} instanceId - 实例ID - * @returns {boolean} 连接状态 - */ isConnected(instanceId) { - const ws = this.connections.get(instanceId); - return ws && ws.readyState === WebSocket.OPEN; + const conn = this.connections.get(instanceId); + return conn?.ws && conn.ws.readyState === WebSocket.OPEN; } } diff --git a/backend/src/workflow-converter/index.js b/backend/src/workflow-converter/index.js new file mode 100644 index 0000000..9c4f70e --- /dev/null +++ b/backend/src/workflow-converter/index.js @@ -0,0 +1,201 @@ +import axios from 'axios'; +import logger from '../logger/index.js'; + +class WorkflowConverter { + constructor() { + this.baseUrl = process.env.WORKFLOW_RESOURCES_URL || 'http://117.72.204.159/AIGC/static/public/workflows'; + this.cache = new Map(); + this.cacheTimeout = 5 * 60 * 1000; + } + + async getWorkflowTemplate(workflowId) { + if (!workflowId) { + throw new Error('workflowId is required'); + } + + const cached = this.cache.get(workflowId); + if (cached && Date.now() - cached.timestamp < this.cacheTimeout) { + logger.debug(`使用缓存的 workflow 模板: ${workflowId}`); + return cached.data; + } + + try { + const url = `${this.baseUrl}/${workflowId}.json`; + logger.info(`正在下载 workflow 模板: ${url}`); + + const response = await axios.get(url, { + timeout: 10000 + }); + + const workflowData = response.data; + + this.cache.set(workflowId, { + data: workflowData, + timestamp: Date.now() + }); + + logger.info(`workflow 模板下载成功: ${workflowId}`); + return workflowData; + } catch (error) { + logger.error(`下载 workflow 模板失败: ${workflowId}`, error.message); + throw new Error(`无法获取 workflow 模板: ${workflowId}, 错误: ${error.message}`); + } + } + + generateRandomSeed() { + return Math.floor(Math.random() * 2147483647) + 1; + } + + findControlAfterGenerateNode(nodeInfoList) { + return nodeInfoList.find(node => node.fieldName === 'control_after_generate'); + } + + findSeedNodeByNodeId(nodeInfoList, nodeId) { + return nodeInfoList.find(node => node.nodeId === nodeId && node.fieldName === 'seed'); + } + + processSeedsInWorkflow(workflow, nodeInfoList) { + const controlAfterGenerateNode = this.findControlAfterGenerateNode(nodeInfoList); + const updatedWorkflow = JSON.parse(JSON.stringify(workflow)); + + const nodesWithExplicitSeed = new Set(); + + if (!controlAfterGenerateNode) { + logger.info('[WorkflowConverter] 未找到 control_after_generate 节点,为所有 seed 生成随机值'); + this.setRandomSeedsForAll(updatedWorkflow); + } else { + const nodeId = controlAfterGenerateNode.nodeId; + const controlValue = controlAfterGenerateNode.fieldValue; + + logger.info(`[WorkflowConverter] 找到 control_after_generate 节点: nodeId=${nodeId}, value=${controlValue}`); + + if (controlValue !== 'fixed') { + if (updatedWorkflow[nodeId] && updatedWorkflow[nodeId].inputs) { + const randomSeed = this.generateRandomSeed(); + updatedWorkflow[nodeId].inputs.seed = randomSeed; + nodesWithExplicitSeed.add(nodeId); + logger.info(`[WorkflowConverter] 为节点 ${nodeId} 的 seed 设置随机值: ${randomSeed}`); + } + } else { + const seedNode = this.findSeedNodeByNodeId(nodeInfoList, nodeId); + if (seedNode && seedNode.fieldValue !== undefined) { + const seedValue = parseInt(seedNode.fieldValue, 10); + if (!isNaN(seedValue)) { + if (updatedWorkflow[nodeId] && updatedWorkflow[nodeId].inputs) { + updatedWorkflow[nodeId].inputs.seed = seedValue; + nodesWithExplicitSeed.add(nodeId); + logger.info(`[WorkflowConverter] 为节点 ${nodeId} 的 seed 设置固定值: ${seedValue}`); + } + } + } + } + + this.setRandomSeedsForOthers(updatedWorkflow, nodesWithExplicitSeed); + } + + return updatedWorkflow; + } + + setRandomSeedsForAll(workflow) { + for (const [nodeId, node] of Object.entries(workflow)) { + if (node.inputs && typeof node.inputs === 'object') { + if ('seed' in node.inputs) { + const randomSeed = this.generateRandomSeed(); + node.inputs.seed = randomSeed; + logger.info(`[WorkflowConverter] 为节点 ${nodeId} 的 seed 设置随机值: ${randomSeed}`); + } + } + } + } + + setRandomSeedsForOthers(workflow, nodesWithExplicitSeed) { + for (const [nodeId, node] of Object.entries(workflow)) { + if (nodesWithExplicitSeed.has(nodeId)) { + continue; + } + + if (node.inputs && typeof node.inputs === 'object') { + if ('seed' in node.inputs) { + const randomSeed = this.generateRandomSeed(); + node.inputs.seed = randomSeed; + logger.info(`[WorkflowConverter] 为其他节点 ${nodeId} 的 seed 设置随机值: ${randomSeed}`); + } + } + } + } + + applyNodeUpdates(workflow, nodeInfoList = []) { + if (!workflow) { + throw new Error('workflow is required'); + } + + const updatedWorkflow = JSON.parse(JSON.stringify(workflow)); + + if (!nodeInfoList || !Array.isArray(nodeInfoList) || nodeInfoList.length === 0) { + return updatedWorkflow; + } + + logger.info(`应用 ${nodeInfoList.length} 个节点更新`); + + for (const nodeInfo of nodeInfoList) { + let nodeId, inputs; + + if (nodeInfo.nodeId && nodeInfo.fieldName && nodeInfo.fieldValue !== undefined) { + nodeId = nodeInfo.nodeId; + inputs = { [nodeInfo.fieldName]: nodeInfo.fieldValue }; + logger.info(`[WorkflowConverter] 检测到格式1: nodeId=${nodeId}, fieldName=${nodeInfo.fieldName}`); + } else if (nodeInfo.nodeId && nodeInfo.inputs) { + nodeId = nodeInfo.nodeId; + inputs = nodeInfo.inputs; + logger.info(`[WorkflowConverter] 检测到格式2: nodeId=${nodeId}, inputs=${Object.keys(inputs).join(',')}`); + } else { + logger.warn(`无效的节点信息,跳过: ${JSON.stringify(nodeInfo)}`); + continue; + } + + if (updatedWorkflow[nodeId]) { + if (!updatedWorkflow[nodeId].inputs) { + updatedWorkflow[nodeId].inputs = {}; + } + + for (const [key, value] of Object.entries(inputs)) { + updatedWorkflow[nodeId].inputs[key] = value; + } + + logger.debug(`节点 ${nodeId} 已更新: ${JSON.stringify(inputs)}`); + } else { + logger.warn(`节点 ${nodeId} 在 workflow 中不存在,跳过`); + } + } + + return updatedWorkflow; + } + + async convert(workflowId, nodeInfoList = []) { + logger.info(`[WorkflowConverter] 开始转换 workflow: workflowId=${workflowId}, nodeInfoList长度=${nodeInfoList?.length || 0}`); + + const workflowTemplate = await this.getWorkflowTemplate(workflowId); + logger.info(`[WorkflowConverter] 获取到的 workflow 模板节点数: ${Object.keys(workflowTemplate || {}).length}`); + + let finalWorkflow = this.applyNodeUpdates(workflowTemplate, nodeInfoList); + + finalWorkflow = this.processSeedsInWorkflow(finalWorkflow, nodeInfoList); + + logger.info(`[WorkflowConverter] 应用节点更新后的 workflow 节点数: ${Object.keys(finalWorkflow || {}).length}`); + + logger.info(`workflow 转换完成: ${workflowId}`); + return finalWorkflow; + } + + clearCache() { + this.cache.clear(); + logger.info('workflow 缓存已清空'); + } + + clearCacheByWorkflowId(workflowId) { + this.cache.delete(workflowId); + logger.info(`workflow 缓存已清空: ${workflowId}`); + } +} + +export default new WorkflowConverter(); diff --git a/frontend/.env b/frontend/.env index 93915fc..d7c7dbc 100644 --- a/frontend/.env +++ b/frontend/.env @@ -1,3 +1,3 @@ # 默认环境配置 -VITE_API_BASE_URL=https://a6848e23804d4315b56a48b456ee83ab.pvt.hz.smartml.cn/api -VITE_MESSAGE_DISPATCHER_BASE_URL=http://localhost:4000 +VITE_API_BASE_URL=http://localhost:8079 +VITE_MESSAGE_DISPATCHER_BASE_URL=http://localhost:8078 diff --git a/frontend/.env.development b/frontend/.env.development index 93915fc..d7c7dbc 100644 --- a/frontend/.env.development +++ b/frontend/.env.development @@ -1,3 +1,3 @@ # 默认环境配置 -VITE_API_BASE_URL=https://a6848e23804d4315b56a48b456ee83ab.pvt.hz.smartml.cn/api -VITE_MESSAGE_DISPATCHER_BASE_URL=http://localhost:4000 +VITE_API_BASE_URL=http://localhost:8079 +VITE_MESSAGE_DISPATCHER_BASE_URL=http://localhost:8078 diff --git a/frontend/.env.production b/frontend/.env.production index 0a20ee7..a51310b 100644 --- a/frontend/.env.production +++ b/frontend/.env.production @@ -1,2 +1,3 @@ # 生产环境配置 -VITE_API_BASE_URL=https://a6848e23804d4315b56a48b456ee83ab.pvt.hz.smartml.cn/api +VITE_API_BASE_URL=http://localhost:8079 +VITE_MESSAGE_DISPATCHER_BASE_URL=http://localhost:8078 diff --git a/frontend/src/layouts/MainLayout.vue b/frontend/src/layouts/MainLayout.vue index a0ac009..419608c 100644 --- a/frontend/src/layouts/MainLayout.vue +++ b/frontend/src/layouts/MainLayout.vue @@ -110,7 +110,7 @@ const handleCommand = async (command) => {