• nodejs worker_threads的事件监听问题


    注册事件的时间有效性

    在new Worker之后,worker就已经立即执行了,这样一来,会出现一个问题,就是在new之后,我们注册'messge'事件时,可能worker就已经执行完了,这样一来我们注册的'message'事件就不会被触发。

    验证实例 1

    比如下代码:

    1. const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
    2. exports.multiInsert = function (target) {
    3. if (isMainThread) {
    4. const max = 12
    5. const min = 1
    6. let primes = []
    7. const threadCount = +process.argv[2] || 2
    8. const threads = new Set()
    9. console.log(`Running with ${threadCount} threads...`)
    10. const range = Math.ceil((max - min) / threadCount)
    11. let start = min
    12. for (let i = 0; i < threadCount - 1; i++) {
    13. const myStart = start
    14. threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
    15. start += range
    16. }
    17. threads.add(new Worker(__filename, {
    18. workerData: {
    19. start,
    20. range: range + ((max - min + 1) % threadCount)
    21. }
    22. }))
    23. setTimeout(function () {
    24. for (const worker of threads) {
    25. const workerTmp = worker;//as Worker
    26. workerTmp.on('error', (err) => {
    27. throw err
    28. });
    29. workerTmp.on('exit', () => {
    30. threads.delete(worker)
    31. console.log(`Thread exiting, ${threads.size} running...`)
    32. if (threads.size === 0) {
    33. // console.log(primes.join('\n'))
    34. }
    35. })
    36. workerTmp.on('message', (msg) => {
    37. // console.log(" workerTmp.on('message'")
    38. console.log('onMessage=' + msg);
    39. primes = primes.concat(msg)
    40. })
    41. }
    42. }, 3000);
    43. } else {
    44. target(workerData.start, workerData.range)
    45. }
    46. }
    47. let insert2DB = function (start, range) {
    48. console.log("start=" + start + ",range" + range);
    49. let arr = [];
    50. for (let i = 0; i < range; i++) {
    51. arr[arr.length] = start + i;
    52. }
    53. console.log("insert2DB=" + arr.toString());
    54. parentPort.postMessage(arr)
    55. }
    56. this.multiInsert(insert2DB);

    上述代码中,有两个函数:

    multiInsert()一个负责创建线程,并在子线程环境下执行insert2DB()函数。

    而insert2DB()函数,就是具体的子线程的执行逻辑(执行任务)。

    其中,注意的是:

    1. insert2DB()函数的内容是:得出从start值开始,一次加1,放入数组,加够range次后,返回数组。

    2. multiInsert中,在创建worker后,注册message事件的代码是被setTimeout包裹的,延时执行3秒。也就是说,创建完worker后,过三秒才注册监听事件。

    这时候,运行代码,得到的结果就是:

    只有这5行输出。 

    而在message和exit事件的回调函数中的console.log并没有被触发。

    为了更能说明问题,再增加一项实验,见如下代码:

    验证实例 2

    1. const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
    2. exports.multiInsert = function (target) {
    3. if (isMainThread) {
    4. const max = 12
    5. const min = 1
    6. let primes = []
    7. const threadCount = +process.argv[2] || 2
    8. const threads = new Set()
    9. console.log(`Running with ${threadCount} threads...`)
    10. const range = Math.ceil((max - min) / threadCount)
    11. let start = min
    12. for (let i = 0; i < threadCount - 1; i++) {
    13. const myStart = start
    14. threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
    15. start += range
    16. }
    17. threads.add(new Worker(__filename, {
    18. workerData: {
    19. start,
    20. range: range + ((max - min + 1) % threadCount)
    21. }
    22. }))
    23. setTimeout(function () {
    24. for (const worker of threads) {
    25. const workerTmp = worker;//as Worker
    26. workerTmp.on('error', (err) => {
    27. throw err
    28. });
    29. workerTmp.on('exit', () => {
    30. threads.delete(worker)
    31. console.log(`Thread exiting, ${threads.size} running...`)
    32. if (threads.size === 0) {
    33. // console.log(primes.join('\n'))
    34. }
    35. })
    36. workerTmp.on('message', (msg) => {
    37. // console.log(" workerTmp.on('message'")
    38. console.log('onMessage=' + msg);
    39. primes = primes.concat(msg)
    40. })
    41. }
    42. }, 3000);
    43. } else {
    44. target(workerData.start, workerData.range)
    45. }
    46. }
    47. let insert2DB = function (start, range) {
    48. setTimeout(function () {
    49. console.log("start=" + start + ",range" + range);
    50. let arr = [];
    51. for (let i = 0; i < range; i++) {
    52. arr[arr.length] = start + i;
    53. }
    54. console.log("insert2DB=" + arr.toString());
    55. parentPort.postMessage(arr)
    56. }, 5000);
    57. }
    58. this.multiInsert(insert2DB);

    说明:与实例1的代码不同的是,修改了insert2DB函数,把里面的内容也交给了一个setTimeout,设置延时5秒,比注册逻辑部分晚发生2秒。

    再次执行,打印出运行结果

    打印效果是,先输出‘Running with 2 threads...’,然后延时一会儿,输出‘start=1,range6
    start=7,range6
    onMessage=1,2,3,4,5,6
    onMessage=7,8,9,10,11,12
    insert2DB=1,2,3,4,5,6
    insert2DB=7,8,9,10,11,12
    Thread exiting, 1 running...
    Thread exiting, 0 running...

    ’)

    可以看到,message和exiting事件被触发了。

    以上是自己实验的结果,如有错误,请大家指正!

  • 相关阅读:
    浅谈wor2vec,RNN,LSTM,Transfermer之间的关系
    windows升级新版本mysql
    MyBatis操作数据库的方式(api+注解)
    Vite为啥如此之快
    NumPy 均匀分布模拟及 Seaborn 可视化教程
    这是成功了吗?为什么我找不到文件呢?
    前端数据加解密:保护敏感信息的关键
    python的类/方法引用---人机石头剪刀布游戏
    HarmonyOS开发(一):开发工具起步
    什么是 x10 开发工具?「GitHub 热点速览」
  • 原文地址:https://blog.csdn.net/jfqqqqq/article/details/126221010