• 我没能实现始终在一个线程上运行 task


    前文我们总结了在使用常驻任务实现常驻线程时,应该注意的事项。但是我们最终没有提到如何在处理对于带有异步代码的办法。本篇将接受笔者对于该内容的总结。

    如何识别当前代码跑在什么线程上

    一切开始之前,我们先来使用一种简单的方式来识别当前代码运行在哪种线程上。

    最简单的方式就是打印当前线程名称和线程ID来识别。

    private static void ShowCurrentThread(string work)
    {
    Console.WriteLine($"{work} - {Thread.CurrentThread.Name} - {Thread.CurrentThread.ManagedThreadId}");
    }

    通过这段代码,我们可以非常容易的识别三种不同情况下的线程信息。

    [Test]
    public void ShowThreadMessage()
    {
    new Thread(() => { ShowCurrentThread("Custom thread work"); })
    {
    IsBackground = true,
    Name = "Custom thread"
    }.Start();
    Task.Run(() => { ShowCurrentThread("Task.Run work"); });
    Task.Factory.StartNew(() => { ShowCurrentThread("Task.Factory.StartNew work"); },
    TaskCreationOptions.LongRunning);
    Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    // output
    // Task.Factory.StartNew work - .NET Long Running Task - 17
    // Custom thread work - Custom thread - 16
    // Task.Run work - .NET ThreadPool Worker - 12

    分别为:

    • 自定义线程 Custom thread
    • 线程池线程 .NET ThreadPool Worker
    • 由 Task.Factory.StartNew 创建的新线程 .NET Long Running Task

    因此,结合我们之前昙花线程的例子,我们也可以非常简单的看出线程的切换情况:

    [Test]
    public void ShortThread()
    {
    new Thread(async () =>
    {
    ShowCurrentThread("before await");
    await Task.Delay(TimeSpan.FromSeconds(0.5));
    ShowCurrentThread("after await");
    })
    {
    IsBackground = true,
    Name = "Custom thread"
    }.Start();
    Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    // output
    // before await - Custom thread - 16
    // after await - .NET ThreadPool Worker - 6

    我们希望在同一个线程上运行 Task 代码

    之前我们已经知道了,手动创建线程并控制线程的运行,可以确保自己的代码不会于线程池线程产生竞争,从而使得我们的常驻任务能够稳定的触发。

    当时用于演示的错误示例是这样的:

    [Test]
    public void ThreadWaitTask()
    {
    new Thread(async () =>
    {
    ShowCurrentThread("before await");
    Task.Run(() =>
    {
    ShowCurrentThread("inner task");
    }).Wait();
    ShowCurrentThread("after await");
    })
    {
    IsBackground = true,
    Name = "Custom thread"
    }.Start();
    Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    // output
    // before await - Custom thread - 16
    // inner task - .NET ThreadPool Worker - 13
    // after await - Custom thread - 16

    这个示例可以明显的看出,中间的部分代码是运行在线程池的。这种做法会在线程池资源紧张的时候,导致我们的常驻任务无法触发。

    因此,我们需要一种方式来确保我们的代码在同一个线程上运行。

    那么接下来我们分析一些想法和效果。

    加配!加配!加配!

    我们已经知道了,实际上,常驻任务不能稳定触发是因为 Task 会在线程池中运行。那么增加线程池的容量自然就是最直接解决高峰的做法。 因此,如果条件允许的话,直接增加 CPU 核心数实际上是最为有效和简单的方式。

    不过这种做法并不适用于一些类库的编写者。比如,你在编写日志类库,那么其实无法欲知用户所处的环境。并且正如大家所见,市面上几乎没有日志类库中由说明让用户只能在一定的 CPU 核心数下使用。

    因此,如果您的常驻任务是在类库中,那么我们需要一种更为通用的方式来解决这个问题。

    考虑使用同步重载

    在 Task 出现之后,很多时候我们都会考虑使用异步重载的方法。这显然不是错误的做法,因为这可以使得我们的代码更加高效,提升系统的吞吐量。但是,如果你想要让 Thread 稳定的在同一个线程上运行,那么你需要考虑使用同步重载的方法。通过同步重载方法,我们的代码将不会出现线程切换到线程池的情况。自然也就实现了我们的目的。

    总是使用 TaskCreationOptions.LongRunning

    这个办法其实很不实际。因为任何一层没有指定,都会将任务切换到线程池中。

    [Test]
    public void AlwaysLogRunning()
    {
    new Thread(async () =>
    {
    ShowCurrentThread("before await");
    Task.Factory.StartNew(() =>
    {
    ShowCurrentThread("LongRunning task");
    Task.Run(() => { ShowCurrentThread("inner task"); }).Wait();
    }, TaskCreationOptions.LongRunning).Wait();
    ShowCurrentThread("after await");
    })
    {
    IsBackground = true,
    Name = "Custom thread"
    }.Start();
    Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    // output
    // before await - Custom thread - 16
    // LongRunning task - .NET Long Running Task - 17
    // inner task - .NET ThreadPool Worker - 7
    // after await - Custom thread - 16

    所以说,这个办法可以用。但其实很怪。

    自定义 Scheduler

    这是一种可行,但是非常困难的做法。虽然说自定义个简单的 Scheduler 也不是很难,只需要实现几个简单的方法。但要按照我们的需求来实现这个 Scheduler 并不简单。

    比如我们尝试实现一个这样的 Scheduler:

    注意:这个 Scheduler 并不能正常工作。

    class MyScheduler : TaskScheduler
    {
    private readonly Thread _thread;
    private readonly ConcurrentQueue _tasks = new();
    public MyScheduler()
    {
    _thread = new Thread(() =>
    {
    while (true)
    {
    while (_tasks.TryDequeue(out var task))
    {
    TryExecuteTask(task);
    }
    }
    })
    {
    IsBackground = true,
    Name = "MyScheduler"
    };
    _thread.Start();
    }
    protected override IEnumerable GetScheduledTasks()
    {
    return _tasks;
    }
    protected override void QueueTask(Task task)
    {
    _tasks.Enqueue(task);
    }
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
    return false;
    }
    }

    上面的代码中,我们期待通过一个单一的线程来执行所有的任务。但实际上它反而是一个非常简单的死锁演示装置。

    我们设想运行下面这段代码:

    [Test]
    public async Task TestLongRunningConfigureAwait()
    {
    var scheduler = new MyScheduler();
    await Task.Factory.StartNew(() =>
    {
    ShowCurrentThread("BeforeWait");
    Task.Factory
    .StartNew(() =>
    {
    ShowCurrentThread("AfterWait");
    }
    , CancellationToken.None, TaskCreationOptions.None, scheduler)
    .Wait();
    ShowCurrentThread("AfterWait");
    }, CancellationToken.None, TaskCreationOptions.None, scheduler);
    }

    这段代码中,我们期待,在一个 Task 中运行另外一个 Task。但实际上,这段代码会死锁。

    因为,我们的 MyScheduler 中,我们在一个死循环中,不断的从队列中取出任务并执行。但是,我们的任务中,又会调用 Wait 方法。

    我们不妨设想这个线程就是我们自己。

    1. 首先,老板交代给你一件任务,你把它放到队列中。
    2. 然后你开始执行这件任务,执行到一半发现,你需要等待第二件任务的执行结果。因此你在这里等着。
    3. 但是第二件任务这个时候也塞到了你的队列中。
    4. 这下好了,你手头的任务在等待你队列里面的任务完成。而你队列的任务只有你才能完成。
    5. 完美卡死。

    因此,其实实际上我们需要在 Wait 的时候通知当前线程,此时线程被 Block 了,然后转而从队列中取出任务执行。在 Task 于 ThreadPool 的配合中,是存在这样的机制的。但是,我们自己实现的 MyScheduler 并不能与 Task 产生这种配合。因此需要考虑自定义一个 Task。跟进一步说,我们需要自定义 AsyncMethodBuilder 来实现全套的自定义。

    显然者是一项相对高级内容,期待了解的读者,可以通过 UniTask1 项目来了解如何实现这样的全套自定义。

    总结

    如果你期望在常驻线程能够稳定的运行你的任务。那么:

    1. 加配,以避免线程池不够用
    2. 考虑在这部分代码中使用同步代码
    3. 可以学习自定义 Task 系统

    参考

    感谢阅读,如果觉得本文有用,不妨点击推荐👍或者在评论区留下 Mark,让更多的人可以看到。

    欢迎关注作者的微信公众号“newbe技术专栏”,获取更多技术内容。 关注微信公众号“newbe技术专栏”


    1. https://github.com/Cysharp/UniTask

    2. https://www.cnblogs.com/eventhorizon/p/15912383.html

    3. https://threads.whuanle.cn/3.task/

    4. https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskcreationoptions?view=net-7.0&WT.mc_id=DX-MVP-5003606

    5. https://www.newbe.pro/Others/0x026-This-is-the-wrong-way-to-use-LongRunnigTask-in-csharp/

    6. https://www.newbe.pro/Others/0x027-error-when-using-async-with-thread/

    7. https://www.newbe.pro/Others/0x028-avoid-return-to-threadpool-in-longrunning-task

  • 相关阅读:
    图 Dijkstra / Bellman-Ford / Floyd-Warshell Leetcode 743-Network Delay Time
    网络安全—DDoS攻防
    【Linux】 ubuntu系统安装图形化界面
    Kafka的docker安装
    代码随想录算法训练营Day61 | 总结和展望 | Python | 个人记录向
    ipfs 分布式储存说明
    python数据分析(numpy)
    【SpringBoot】SpringBoot:构建安全的Web应用程序
    ​ModbusTCP转Profibus-DP从站网关把modbus的数据传到300plc上的应用方法​​
    如何学习一个大型分布式Java项目
  • 原文地址:https://www.cnblogs.com/newbe36524/p/0x029-I-can-not-manage-to-always-run-task-on-one-thread.html