• 【JS】实现 Promise 的并发数控制


    前言

    首先 JS 自带两个并发控制的函数 Promise.allPromise.race,前者用来实现并发运行参数列表中的所有 Promise;后者同样并发运行所有 Promise,但是当任意一个 Promise 运行完成函数就会返回:

    async function myFetch() {
      return new Promise(resolve => {
      	setTimeout(resolve, 1000)
      })
    }
    
    // myFetch 并发执行,返回参数列表的 Promise
    Promise.all([myFetch(), myFetch()])
    
    // 返回最先运行完的 myFetch 的结果,只有一个
    Promise.race([myFetch(), myFetch()])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    上面两个方法还不够灵活,无法实现并发数的控制,下面提供两个思路。

    1. Promise.all

    Promise.all能并发执行参数列表中的 Promise,那么我们可以控制参数列表中 Promise 的数量,实现类似并发数控制的功能。

    const tasks = Array.from({length: 10}).map((_, idx) => {   
      return () => new Promise(resolve => {
          setTimeout(() => {
          	console.log(idx, 'done')
          	resolve(idx)
          }, idx * 100)
      })
    })
    
    async function Scheduler(promises, limit) { 
      const results = []
      
      for(let i=0; i<=promises.length; i+=limit) {
      	const tasks = []
      	// 切片 Promise.all 的参数
        for(const promise of promises.slice(i, i + limit)) {
          tasks.push(promise())
    	}
    	// 用 await 以确保后面的任务不会在执行当前任务时执行
        const result = await Promise.all(tasks)
        results.push(...result)
      }
      
      return results
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这种方法的基本思路就是通过 for 循环切片出指定数量的任务,然后 await Promise.all(tasks) 阻塞当前函数的运行。使用这种方法有如下注意事项:

    1. promises 的参数必须是一个包含返回 promise 对象的函数
      这是因为如果直接传入 Promise 对象,这个 Promise 会立即执行,无法达到并发控制的效果。
    2. 后一批并发的任务必须等待前一批任务全部完成才能执行,这是因为 Promise.all 的功能限制。从这一点来讲当任务数量多于并发数时,会产生时间上的浪费。

    手写并发控制类

    这种实现方法下,我们保存一个任务列表,当运行中的任务的数量小于并发数时,从任务列表中取出任务并执行。

    class Scheduler {
    	
    	// 并发运行 tasks,最大并发量由 limit 指定
    	// 每次运行前得先 new 一个实例出来,应该 tasks set 没有重置
    	runningTasks = new Set()
    	fufilledTasks = new Set()
    	
    	run(tasks, limit) {
    		for(const task of tasks) {
    			if(
    				this.runningTasks.size<= limit && 
    				!this.fufilledTasks.has(task)
    			){
    				this.runningTasks.add(task)
    				this.fufilledTasks.add(task)
    				
    				task().then(() => {
    					this.runningTasks.delete(task)
    					this.run(tasks, limit)
    				})
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    这种情况下我们先保存没运行的协程,在那些被运行的协程运行后的 .then 方法中,运行没运行的协程,达到并发控制的目的。
    可以优化的点:

    1. 提供返回值列表
    2. 调用 run 方法之前无需实例化类
  • 相关阅读:
    慢SQL,压垮团队的最后一根稻草!
    抗混叠在微小目标检测中的重要性
    windows用脚本编译qt的项目
    研究生数学建模竞赛——近五年赛题分析以及数据分析类赛题优秀论文分享
    taro3.*中使用 dva 入门级别的哦
    【leetcode】【2022/9/18】827. 最大人工岛
    简析低功耗蓝牙芯片PHY6222/PHY6252 蓝牙锁的应用
    图灵奖得主LeCun指明AI未来的出路在于自主学习,这家公司已踏上征途
    RTC风向标:11月最值得关注的26个热点
    Scala 变量
  • 原文地址:https://blog.csdn.net/qq_39559879/article/details/126043783