实现一个具有并发数量限制的异步任务调度器,可以规定最大同时运行的任务。
JS实现一个带并发限制的异步调度器Scheduler,保证同时运行的任务最多有两个。完善下面代码的Scheduler类,使以下程序能够正常输出:
class Scheduler {
add(promiseCreator) { ... }
// ...
}
const timeout = time => new Promise(resolve => {
setTimeout(resolve, time);
})
const scheduler = new Scheduler(2);
const addTask = (time,order) => {
scheduler.add(() => timeout(time)).then(()=>console.log(order)))
//scheduler.add(() => timeout(time)) 参数是一个promise,返回一个promise
}
addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// output: 2 3 1 4
整个的完整执行流程:
起始1、2两个任务开始执行
500ms时,2任务执行完毕,输出2,任务3开始执行
800ms时,3任务执行完毕,输出3,任务4开始执行
1000ms时,1任务执行完毕,输出1,此时只剩下4任务在执行
1200ms时,4任务执行完毕,输出4
任务调度器-控制任务的执行,当资源不足时将任务加入等待队列,当资源足够时,将等待队列中的任务取出执行
在调度器中一般会有一个等待队列queue,存放当资源不够时等待执行的任务。
具有并发数据限制,假设通过max设置允许同时运行的任务,还需要count表示当前正在执行的任务数量。
当需要执行一个任务A时,先判断count==max 如果相等说明任务A不能执行,应该被阻塞,阻塞的任务放进queue中,等待任务调度器管理。
如果count<max说明正在执行的任务数没有达到最大容量,那么count++执行任务A,执行完毕后count--。
此时如果queue中有值,说明之前有任务因为并发数量限制而被阻塞,现在count<max,任务调度器会将对头的任务弹出执行。
这里的任务都是异步任务
class Scheduler {
constructor(max) {
// 最大可并发任务数
this.max = max;
// 当前并发任务数
this.count = 0;
// 阻塞的任务队列
this.queue = [];
}
add(promiseCreator){//添加任务
if(this.count<this.max) this.start(promiseCreator); //执行任务
else this.queue.push(promiseCreator); //资源不够放入队列中等待执行
}
async start(promiseCreator){//执行promiseCreator任务,promiseCreator是异步任务
this.count++;
await promiseCreator();//获得当前异步任务的结果
this.count--;
//如果此时队列中有值
if(this.queue.length)
//this.queue.shift()(); 这样直接执行的话,count没有变化
this.start(this.queue.shift()); //使用递归调用执行队列中的函数
}
}
}
利用promise实现让队列的中的任务出来的时候,主动执行start。
实现方法
new一个promise,将promise的resolve参数放入队列,当出队时,执行resolve。将start放入成功的回调(onResolved)中,当resolve执行时,会自动执行start。
class Scheduler {
constructor(max) {
// 最大可并发任务数
this.max = max;
// 当前并发任务数
this.count = 0;
// 阻塞的任务队列
this.queue = [];
}
add(promiseCreator){
if(this.count<this.max) this.start(promiseCreator); //执行任务
else {
new Promise(resolve=>{
this.queue.push(resolve);
// 什么时候执行resolve,什么时候执行下面then中注册的回调
}).then(() => {
this.start(promiseCreator);
})
}
}
async start(promiseCreator){//执行promiseCreator任务,promiseCreator是异步任务
this.count++;
await promiseCreator();//获得当前异步任务的结果
this.count--;
//如果此时队列中有值
if(this.queue.length) this.queue.shift()();
}
}
}
简化一下写法 将add/start函数改造成async和await模式
await只能得到成功的结果
async add(promiseCreator){
if(this.count<this.max) await this.start(promiseCreator); //执行任务
else {
await new Promise(resolve=>{
this.queue.push(resolve);
})
await this.start(promiseCreator);//等获取到上面的promise结果,也就是resolve被调用后执行当前语句
}
}
//简化书写
async add(promiseCreator) { //add是一个异步函数,async修饰的函数返回promise,promise的结果由add函数的执行确定,这里是返回一个成功的promise
if (this.count >= Scheduler.max) {
await new Promise(resolve => {
this.tasks.push(resolve);
});
}
await this.start(promiseCreator); // 注意这里要加await, this.start也是个异步任务
}