如何在long-running task中调用async方法

如何,long,running,task,调用,async,方法 · 浏览次数 : 1364

小编点评

**内容简介** 该文章介绍如何使用 async 关键字执行异步方法并避免线程池中的死锁问题。 **主要内容** 1. **线程池和异步方法** - 线程池用于创建并执行异步方法。 -异步方法在线程池中执行,并与线程池中的其他异步方法同步。 2. **await 关键字和线程池死锁** - await 关键字用于阻塞当前线程,直到异步方法执行完毕。 -如果多个 async 方法使用 await 关键字,可能会导致线程池中的死锁。 3. **Task.Wait 方法** - Task.Wait 方法用于等待一个 Task 的执行结果。 -它不会创建新的线程,而是等待现有的线程执行完毕。 4. **示例** - 该文章提供一个示例,展示如何使用 await 关键字执行异步方法并避免线程池中的死锁问题。 **结论** 使用 await 关键字执行异步方法可以避免线程池中的死锁问题。建议使用 Task.Wait 方法等待异步方法的执行结果。

正文

什么是 long-running thread

long-running task 是指那些长时间运行的任务,比如在一个 while True 中执行耗时较长的同步处理。

下面的例子中,我们不断从队列中尝试取出数据,并对这些数据进行处理,这样的任务就适合交给一个 long-running task 来处理。

var queue = new BlockingCollection<string>();

Task.Factory.StartNew(() =>
{
    while (true)
    {
        // BlockingCollection<T>.Take() 方法会阻塞当前线程,直到队列中有数据可以取出。
        var input = queue.Take();
        Console.WriteLine($"You entered: {input}");
    }
}, TaskCreationOptions.LongRunning);


while (true)
{
    var input = Console.ReadLine();
    queue.Add(input);
}

在 .NET 中,我们可以使用 Task.Factory.StartNew 方法并传入 TaskCreationOptions.LongRunning 来创建一个 long-running task。

虽然这种方式创建的 long-running task 和默认创建的 task 一样,都是分配给 ThreadPoolTaskScheduler 来调度的, 但 long-running task 会被分配到一个新的 Background 线程上执行,而不是交给 ThreadPool 中的线程来执行。

class ThreadPoolTaskScheduler : TaskScheduler
{
    // ...
    protected internal override void QueueTask(Task task)
    {
        TaskCreationOptions options = task.Options;
        if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
        {
            // 在一个新的 Background 线程上执行 long-running task。
            new Thread(s_longRunningThreadWork)
            {
                IsBackground = true,
                Name = ".NET Long Running Task"
            }.UnsafeStart(task);
        }
        else
        {
            // 非 long-running task 交给 ThreadPool 中的线程来执行。
            ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
        }
    }
    // ...
}

为什么long-running task要和普通的task分开调度

如果一个task持续占用一个线程,那么这个线程就不能被其他的task使用,这和 ThreadPool 的设计初衷是相违背的。

如果在 ThreadPool 中创建了大量的 long-running task,那么就会导致
ThreadPool 中的线程不够用,从而影响到其他的 task 的执行。

在 long-running task await 一个 async 方法后会发生什么

有时候,我们需要在 long-running task 中调用一个 async 方法。比如下面的例子中,我们需要在 long-running task 中调用一个 async
的方法来处理数据。

var queue = new BlockingCollection<string>();

Task.Factory.StartNew(async () =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        await ProcessAsync(input);
        Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
}, TaskCreationOptions.LongRunning);

async Task ProcessAsync(string input)
{
    // 模拟一个异步操作。
    await Task.Delay(100);
    Console.WriteLine($"You entered: {input}, thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}

while (true)
{
    var input = Console.ReadLine();

    queue.Add(input);
}

TaskScheduler InternalCurrentTaskScheduler()
{
    var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic);
    return (TaskScheduler)propertyInfo.GetValue(null);
}

连续输入 1、2、3、4,输出如下:

1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
2
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 2, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
3
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 3, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
4
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 4, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True

从执行结果中可以看出,第一次 await 之前,当前线程是 long-running task 所在的线程(thread id: 9),此后就变成了 ThreadPool
中的线程(thread id: 4)。

至于为什么之后一直是 ThreadPool 中的线程(thread id: 4),这边做一下简单的解释。在我以前一篇介绍 await 的文章中介绍了 await 的执行过程,以及 await 之后的代码会在哪个线程上执行。

https://www.cnblogs.com/eventhorizon/p/15912383.html

  1. 第一次 await 前,当前线程是 long-running task 所在的线程(thread id: 9),绑定了 TaskScheduler(ThreadPoolTaskScheduler),也就是说 await 之后的代码会被调度到 ThreadPool 中执行。
  2. 第一次 await 之后的代码被调度到 ThreadPool 中的线程(thread id: 4)上执行。
  3. ThreadPool 中的线程不会绑定 TaskScheduler,也就意味着之后的代码还是会在 ThreadPool 中的线程上执行,并且是本地队列优先,所以一直是 thread id: 4 这个线程在从本地队列中取出任务在执行。

线程池的介绍请参考我另一篇博客
https://www.cnblogs.com/eventhorizon/p/15316955.html

回到本文的主题,如果在 long-running task 使用了 await 调用一个 async 方法,就会导致为 long-running task 分配的独立线程提前退出,和我们的预期不符。

long-running task 中 调用 一个 async 方法的可能姿势

使用 Task.Wait

在 long-running task 中调用一个 async 方法,可以使用 Task.Wait 来阻塞当前线程,直到 async 方法执行完毕。
对于 Task.Factory.StartNew 创建出来的 long-running task 来说,因为其绑定了 ThreadPoolTaskScheduler,就算是使用 Task.Wait
阻塞了当前线程,也不会导致死锁。
并且 Task.Wait 会把异常抛出来,所以我们可以在 catch 中处理异常。

// ...
Task.Factory.StartNew( () =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        ProcessAsync(input).Wait();
        Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}");
    }
}, TaskCreationOptions.LongRunning);
// ...

输出如下:

1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 2, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 3, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 4, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False

Task.Wait 并不会对 async 方法内部产生影响,所以 async 方法内部的代码还是按照正常的逻辑执行。这边 ProcessAsync 方法内部打印的
thread id 没变纯粹是因为 ThreadPool 目前就只创建了一个线程,你可以疯狂输入看看结果。

关于 Task.Wait 的使用,可以参考我另一篇博客
https://www.cnblogs.com/eventhorizon/p/17481757.html

使用自定义的 TaskScheduler 来创建 long-running task

Task.Factory.StartNew(async () =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine(
            $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        await ProcessAsync(input);
        Console.WriteLine(
            $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());

class CustomerTaskScheduler : TaskScheduler
{
    // 这边的 BlockingCollection 只是举个例子,如果是普通的队列,配合锁也是可以的。
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

    public CustomerTaskScheduler()
    {
        var thread = new Thread(() =>
        {
            foreach (var task in _tasks.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        })
        {
            IsBackground = true
        };
        thread.Start();
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks;
    }

    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }
}

输出如下:

1
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 1, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 2, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 3, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 4, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False

因为修改了上下文绑定的 TaskScheduler,会影响到 async 方法内部 await 回调的执行。

这种做法不推荐使用,因为可能会导致死锁。

如果我将 await 改成 Task.Wait,就会导致死锁。

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine(
            $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        ProcessAsync(input).Wait();
        Console.WriteLine(
            $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());

输出如下:

1
Before process: thread id: 7, task scheduler: CustomerTaskScheduler, thread pool: False

后面就没有输出了,因为死锁了,除非我们在 ProcessAsync 方法内部每个 await 的 Task 后加上ConfigureAwait(false)。

同理,同学们也可以尝试用 SynchronizationContext 来实现类似的效果,同样有死锁的风险。

总结

如果你想要在一个 long-running task 中执行 async 方法,使用 await 关键字会导致 long-running task 的独立线程提前退出。

比较推荐的做法是使用 Task.Wait。如果连续执行多个 async 方法,建议将这些 async 方法封装成一个新方法,然后只 Wait 这个新方法的 Task。

与如何在long-running task中调用async方法相似的内容:

如何在long-running task中调用async方法

# 什么是 long-running thread long-running task 是指那些长时间运行的任务,比如在一个 while True 中执行耗时较长的同步处理。 下面的例子中,我们不断从队列中尝试取出数据,并对这些数据进行处理,这样的任务就适合交给一个 long-running tas

.NET周报 【6月第4期 2023-06-25】

## 国内文章 ### 如何在long-running task中调用async方法 https://www.cnblogs.com/eventhorizon/p/17497359.html long-running task 是指那些长时间运行的任务,比如在一个 while True 中执行耗时较

nordic——long range测试

简介:本案例测试了long range,注意nrf52系列芯片中,部分硬件是不支持CADED的,因此也就是不支持long range,如nrf52832就不支持long range。同时协议栈也是部分支持,部分不支持,支持的如S140,不支持的如S113.所以在开发前需要把软件和硬件都确定好。 测试

网易面试:SpringBoot如何开启虚拟线程?

虚拟线程(Virtual Thread)也称协程或纤程,是一种轻量级的线程实现,与传统的线程以及操作系统级别的线程(也称为平台线程)相比,它的创建开销更小、资源利用率更高,是 Java 并发编程领域的一项重要创新。 PS:虚拟线程正式发布于 Java 长期支持版(Long Term Suort,LT

Vue3 如何接入 i18n 实现国际化多语言

如何在现有 Vue 3.0 + Vite 项目中,引入 i18n 实现国际化多语言,可以手动切换,SEO友好,且完整可用的解决方案。

如何在 Windows 使用 Podman Desktop 取代 Docker Desktop

Podman Desktop 是 Docker Desktop 的免费替代品,是本地开发使用的另一个绝佳选择。它提供了类似的功能集,同时保持完全开源,让您避免使用 Docker 产品的许可问题。在本文中,您将学习如何安装和开始使用 Podman Desktop 来运行容器并部署到 Kubernete

如何在Spring Boot框架下实现高效的Excel服务端导入导出?

前言 Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。今天我们就使用纯前对按表格控件带大家了解,如何在Spring Boot框架下实现Excel服务端导

如何更改.NET中的默认时区?

除了"在操作系统中修改时区信息,然后重启.NET应用程序,使其生效"之外。如何在不修改操作系统时区的前提下,修改.NET中的默认时区呢? 这是一位 同学兼同事 于5月21日在技术群里问的问题,我当时简单地研究了一下,就写出来了。 现在写文章分享给大家,虽然我觉得这种需求非常小众,几乎不会有人用到。

如何在Spring Boot中配置MySQL数据库连接数

1.如何在Spring Boot中配置MySQL数据库的连接数 1.1主要配置 在Spring Boot中配置MySQL数据库连接数通常涉及到两个主要的配置: (1)数据源配置:这通常是在application.properties或application.yml文件中完成的,用于设置数据源的基本参

如何在低代码平台中引用 JavaScript ?

引言 在当今快速发展的数字化时代,企业对业务应用的需求日益复杂且多元。低代码开发平台作为一个创新的解决方案,以直观易用的设计理念,打破了传统的编程壁垒,让非技术人员也能轻松构建功能完备的Web应用程序,无需深入编码。这一特性极大地简化了应用开发流程,加速了业务需求转化为实际应用的速度,为企业带来了前