73 lines
2.3 KiB
JavaScript
73 lines
2.3 KiB
JavaScript
import { parentPort, Worker } from 'worker_threads'
|
||
import redis from '../../redis/index.js'
|
||
import initQueue from '../../redis/initQueue.js'
|
||
|
||
class QueryThreadPool {
|
||
constructor(size = 1) {
|
||
this.size = size; // 线程池大小,默认为6
|
||
this.workers = []; // 存储worker对象的数组
|
||
this.initWorkers(); // 初始化worker线程
|
||
}
|
||
|
||
initWorkers() {
|
||
// // console.log('初始化线程池')
|
||
for (let i = 0; i < this.size; i++) {
|
||
const worker = new Worker('./worker_threads/assessment/PreproTask.js')
|
||
worker.postMessage({type: 'once',id: i});
|
||
// 为每个 worker 添加事件监听器
|
||
worker.on('message', (message) => {
|
||
if (message.type === 'ok'){
|
||
// console.log(`Worker ${message.id} 已准备就绪`);
|
||
} else{
|
||
parentPort.postMessage(message)
|
||
this.workers[message.id].cont -= 1
|
||
}
|
||
})
|
||
|
||
this.workers.push(
|
||
{
|
||
worker,
|
||
cont: 0,
|
||
}
|
||
)
|
||
}
|
||
}
|
||
|
||
// 将任务发送给任务数最少的线程
|
||
executeTask(message) {
|
||
// // console.log('分配任务给各线程')
|
||
|
||
let minWorkerIndex = 0;
|
||
for (let i = 1; i < this.workers.length; i++) {
|
||
if (this.workers[i].cont < this.workers[minWorkerIndex].cont) {
|
||
minWorkerIndex = i;
|
||
}
|
||
}
|
||
|
||
// 将任务分配给任务数最少的worker
|
||
const minWorker = this.workers[minWorkerIndex];
|
||
// 将message转换为JSON字符串后再存储到Redis,因为Redis的rPush命令要求参数必须是字符串或Buffer类型
|
||
redis.rPush(`${initQueue.prefix}:assessment:${minWorkerIndex}`, message) // message: 前端完全返回
|
||
minWorker.cont += 1
|
||
// // console.log('分配给线程:', minWorkerIndex)
|
||
}
|
||
// 清理资源
|
||
terminate() {
|
||
this.workers.forEach((worker) => {
|
||
worker.worker.terminate();
|
||
});
|
||
}
|
||
}
|
||
|
||
// 创建线程池实例
|
||
const threadPool = new QueryThreadPool(3); // 3个线程的线程池
|
||
|
||
// 监听主线程消息
|
||
parentPort.on('message', async (message) => {
|
||
// // console.log('接收到主线程消息:', message)
|
||
if (message.type === 'submit') {
|
||
// 提交任务给提交线程池
|
||
// // console.log('提交任务给线程池');
|
||
threadPool.executeTask(message.data); // 提交任务
|
||
}
|
||
}) |