C#异步编程是怎么回事(番外)

· 浏览次数 : 4

小编点评

在上篇通信协议碰到了多线程、阻塞、非阻塞、锁、信号量等问题后,确实很有必要研究多线程与异步编程。以一个简单的例子为例,展示了如何实现异步编程。 在这个例子中,程序会提示用户输入 "time" 或 "job [name]",然后根据用户输入执行相应的操作。如果输入 "time",则输出当前时间;如果输入 "job [name]",则启动一个异步任务,执行指定的工作。为了更好地理解上下文流转和多线程,我们在这个例子中引入了一些必要的变量和异常处理。 首先,当用户输入 "time" 时,程序会调用 `PrintCurrentTime()` 函数输出当前时间。这个函数是在主线程中执行的,并且不会阻塞主线程。接着,程序会继续执行其他任务,直到用户输入 "job [name]"。 当用户输入 "job [name]" 时,程序会调用 `StartJob()` 函数启动一个异步任务。这个函数会将任务的名称作为参数传递,并使用 `Task.Delay(10000)` 模拟一个耗时任务。在这个过程中,`StartJob()` 函数会创建一个新的任务并启动它。由于 `Task.Delay(10000)` 是一个异步任务,它会利用线程池中的一个线程来执行。这意味着主线程不会被阻塞。 在这个例子中,`StartJob()` 函数的第二个参数是一个 `StartJobAsyncMachine` 对象,它是用来管理异步操作的。这个对象会追踪任务的进度,并在任务完成时通知 `StartJob()` 函数。 总之,这个例子虽然简单,但它涵盖了异步编程的主要概念,包括上下文流转、多线程、阻塞与非阻塞、锁和信号量等。通过这个例子,我们可以更好地理解异步编程的概念和应用。

正文

在上一篇通信协议碰到了多线程,阻塞、非阻塞、锁、信号量...,会碰到很多问题。因此我感觉很有必要研究多线程异步编程

首先以一个例子开始

image

我说明一下这个例子。
这是一个演示异步编程的例子。

  • 输入job [name],在一个同步的Main方法中,以一发即忘的方式调用异步方法StartJob()
  • 输入time,调用同步方法PrintCurrentTime()输出时间。
  • 输出都带上线程ID,便于观察。
    可以看到,主线程不会阻塞。主线程在同步方法中使用一发即忘的方式调用异步方法时,在异步方法中碰到阻塞时,主线程返回同步方法中继续执行。而异步方法在另一个线程中继续执行。
    程序如下
internal class Program
{
    static void Main(string[] args)
    {
        while (true)
        {
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Enter 'time' to get current time or 'job [name]' to start a job:");
            string input = Console.ReadLine();

            if (input.StartsWith("time"))
            {
                // 输出当前时间
                PrintCurrentTime();
            }
            else if (input.StartsWith("job"))
            {
                // 启动一个异步任务,执行指定的工作
                string[] parts = input.Split(new char[] { ' ' }, 2);
                string jobName = parts.Length > 1 ? parts[1] : string.Empty;
                StartJob(jobName);
            }
            else
            {
                Console.WriteLine("Invalid input. Please try again.");
            }
        }
    }

    static void PrintCurrentTime()
    {
        Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Current time: {DateTime.Now}");
    }

    static async void StartJob(string jobName)
    {
        // 获取主线程的线程 ID
        int mainThreadId = Thread.CurrentThread.ManagedThreadId;

        // 检查是否在主线程上
        bool onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;

        Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Starting job '{jobName}'. This will take 10 seconds...");

        // 输出主线程上下文移动情况
        Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Main thread context moved to new thread: {(!onMainThread)}");

        await Task.Delay(10000); // 模拟任务需要10秒钟完成

        // 输出任务完成信息及上下文移动情况
        Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Job '{jobName}' completed. Main thread context moved to new thread: {(!onMainThread)}");
    }

}

上下文流转

一个方法从一个线程代码栈被切换,或者说被剪切到另一个线程代码栈上去,可以称为上下文流转
这对于理解异步编程是一个重要的点。
但由于上面的程序缺少必要变量,我需要在不同位置加几个变量,来展示上下文确实被移动了。

static async void StartJob(string jobName)
{
	int mainThreadId = Thread.CurrentThread.ManagedThreadId;
	// 检查是否在主线程上
	bool onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;
	...
}

image
可以看到onMainThread一直为False,这个变量从线程1移动到线程5
而且bool是值类型,在栈上面,这说明StartJob这段代码确实移动到线程5的栈上面去了。(每个线程都有一个调用栈)

使用VS调试窗口监视线程

想要再进一步,更清晰的话说明上下文流转的话,那就得监视这两个线程栈的内容了。万幸的是 vs提供了这个功能,调试 > 窗口 > 并行堆栈

  • 命中断点时,StartJob方法在主线程24876上
    image

  • 10秒后再次命中,StartJob方法跑到了任务线程上。而主线程现在在Main函数的Console.ReadLine()那里阻塞
    image

  • 代码阻塞与线程阻塞
    在上面的例子中我们引出两种现象,代码阻塞线程阻塞
    代码阻塞时,线程不一定阻塞,原线程没有阻塞,去执行别的代码了,而由新线程接手当前上下文和调用栈阻塞在这里,比如这里的await Task.Delay(10000)
    代码阻塞时线程也可能阻塞,比如lock(lockObj)Console.ReadLine()
    为了方便,我们姑且这样命名吧

    • 代码阻塞时,线程不阻塞称之为等待await
    • 代码阻塞时,线程也阻塞称之为阻塞block
  • 为什么有两个箭头
    这里为什么有线程24666和27548两个NET TP Worker(.NET Thread Pool (TP) Worker)?据chatGPT解释,Delay语句在线程池中找了一个线程去执行,一旦延迟时间到达,StartJob会在其中一个线程池线程上恢复执行。计时是一个线程,恢复上下文是另一个线程。Delay就代表了我们的那个耗时线程(不是异步方法所在线程)。
    既然有两个线程的联动,其中就出现了一些熟悉的东西。信号量Semaphore,一次性信号量的消耗TrySetResult,但详细过程我还不清楚。
    MSDN上的例子也是这样
    image

以同步的方式进行异步编程

原来把异步方法的上下文移动到新线程N,保证主线程不阻塞(脱离主线程U)。然后N用第三个线程C执行耗时任务,最后把C结果给位于N中的上下文。
站在代码编写者的角度,不特意去看线程的话,就不会注意到异步方法的上下文从一个线程跑到另一个线程上去了。这就是所谓的以同步的方式进行异步编程。
那么线程N的执行就明晰了。先保存上下文,然后启用新线程C进行耗时任务,并阻塞。等C使用信号量或其他什么通知N时,N再根据C的结果继续执行。
可以这样总结

  • asyncawait是一个语法糖。
  • 以同步的方式进行异步编程的方式是使用语法糖,以同步的方式书写代码,然后编译成适当的异步的实现。

我列出几种可能的异步的实现

1. 异步状态机

  • 异步状态机是C#编译async语法糖的实现方式
  • 异步方法StartJob将会被编译成一个同步方法StartJobAsync和一个状态机StartJobAsyncMachine
  • 状态机流转上下文的方式是将新线程用到的变量提升为字段,储存于可被线程共享的进程堆中
  • MoveNext方法可以被不同线程执行,这是关键
点击查看代码
internal class Program
{
    ...

    internal static void StartJobAsync(string jobName)
    {
        StartJobAsyncMachine stateMachine = new StartJobAsyncMachine();
        stateMachine.builder = AsyncVoidMethodBuilder.Create();
        stateMachine.jobName = jobName;
        stateMachine.state = -1;
        stateMachine.builder.Start(ref stateMachine);
    }

    public sealed class StartJobAsyncMachine : IAsyncStateMachine
    {
        public int state;

        public AsyncVoidMethodBuilder builder;

        private TaskAwaiter taskAwaiter;

        //形参会编译成public字段
        public string jobName;
        //被新线程使用的局部变量会编译成private字段
        private bool onMainThread;

        private void MoveNext()
        {
            int num = state;
            try
            {
                TaskAwaiter awaiter;
                if (num != 0)
                {
                    // 获取主线程的线程 ID
                    int mainThreadId = Thread.CurrentThread.ManagedThreadId;

                    // 检查是否在主线程上
                    onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;

                    Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Starting job '{jobName}'. This will take 10 seconds...");

                    // 输出主线程上下文移动情况
                    Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Main thread context moved to new thread: {(!onMainThread)}");
                    awaiter = Task.Delay(10000).GetAwaiter();

                    if (!awaiter.IsCompleted)
                    {
                        num = (state = 0);
                        taskAwaiter = awaiter;
                        StartJobAsyncMachine stateMachine = this;
                        builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                        return;
                    }
                }
                else
                {
                    awaiter = taskAwaiter;
                    taskAwaiter = default(TaskAwaiter);
                    num = (state = -1);
                }
                awaiter.GetResult();
                // 输出任务完成信息及上下文移动情况
                Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Job '{jobName}' completed. Main thread context moved to new thread: {(!onMainThread)}");
            }
            catch (Exception exception)
            {
                state = -2;
                builder.SetException(exception);
                return;
            }
            state = -2;
            builder.SetResult();
        }

        void IAsyncStateMachine.MoveNext()
        {
            this.MoveNext();
        }

        private void SetStateMachine(IAsyncStateMachine stateMachine)
        {
        }

        void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
        {
            this.SetStateMachine(stateMachine);
        }

    }
}

StartJobAsync的调用和原方法等效。我在Main中在加一种指令jobMachine调用StartJobAsync。原来的改为job空格

else if (input.StartsWith("jobMachine "))
{
    // 启动一个异步任务,执行指定的工作
    string[] parts = input.Split(new char[] { ' ' }, 2);
    string jobName = parts.Length > 1 ? parts[1] : string.Empty;
    StartJobAsync(jobName);
}

image

2. 协程

这种方法到底叫协程还是异步迭代器,我不太分得清,但目的是能够达到的,我暂且就叫做协程好了。
虽然这种做法就像脱裤子放屁,因为协程最后也会编译成状态机。这个例子主要是为了演示直观。
理论上,C#中的异步/等待(async/await)语法并不是直接编译成协程的,而是由编译器生成状态机(state machine)来管理异步操作。但是,我们可以通过理解协程的工作原理以及C#异步/等待模型的特性,来描绘一种可能的编译结果。
这里我写了一个基于协程的异步的实现。效果和原来的等同。

  • 原理
    和状态机实现基本一样。对于每个async方法生成一个协程。而且在异步方法嵌套时,那么async方法内部的async方法在编译时就不需要开一个新线程了。要不然得多少线程。
internal class Program
{
    static void Main(string[] args)
    {
        while (true)
        {
            ...
            else if (input.StartsWith("jobCorotine "))
            {
                // 启动一个异步任务,执行指定的工作
                string[] parts = input.Split(new char[] { ' ' }, 2);
                string jobName = parts.Length > 1 ? parts[1] : string.Empty;
                StartJobAsync_2(jobName);
            }
            ...
        }
    }

    #region 异步协程
    static void StartJobAsync_2(string jobName)
    {
        StartJobAsyncCorotine startJobCorotine = new StartJobAsyncCorotine();
        startJobCorotine.jobName = jobName;
        var enumerator = startJobCorotine.DelayedOperations();
        var iterator = enumerator.GetEnumerator();
        bool next = false;
        while (true)
        {
            next = iterator.MoveNext();
            if (!iterator.Current.IsCompleted)
            {
                //异步方法中存在耗时任务,切换到新线程
                break;
            }
            next = false;
        }
        if (next == false)
        {
            return;
        }
        //异步方法存在耗时任务,切换上下文到新线程
        Task.Run(() =>
        {
            do
            {
                if (!iterator.Current.IsCompleted)
                {
                    //创建耗时任务线程进行耗时任务
                    Task.Run(() =>
                    {
                        iterator.Current.GetResult();
                    }).Wait();
                }
            }
            while (iterator.MoveNext());
        });
    }

    public sealed class StartJobAsyncCorotine
    {
        //形参因为需要运行时赋值,只能写成字段的形式
        public string jobName;

        public int Count = 1;

        public IEnumerable<TaskAwaiter> DelayedOperations()
        {
            TaskAwaiter awaiter1;

            // 获取主线程的线程 ID
            int mainThreadId = Thread.CurrentThread.ManagedThreadId;

            // 检查是否在主线程上
            bool onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;

            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Starting job '{jobName}'. This will take 10 seconds...");

            // 输出主线程上下文移动情况
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Main thread context moved to new thread: {(!onMainThread)}");

            awaiter1 = Task.Delay(10000).GetAwaiter(); // 模拟任务需要10秒钟完成
            //出去判断这是否是耗时任务以切换线程
            yield return awaiter1;

            // 输出任务完成信息及上下文移动情况
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Job '{jobName}' completed. Main thread context moved to new thread: {(!onMainThread)}");
        }
    }
    #endregion
}
  • 效果确实和原来一样

image

3. 闭包

这真不需要多说,通过闭包进行捕获上下文真的是太常见了,Ajax中用到吐🤮

带返回值的上下文流转

StartJob是没有返回值的,假如我们需要一个返回值呢,比如一个bool,用于判断接下来的执行流程。
调用异步方法StartJob的同步方法Main之间存在着绝对的分界线——两个线程。同步方法不会被交给异步方法中的那个新线程,没法在同步方法中以同步的方式进行异步编程
唯一的一点看头是,至少Task还给我们留下了一个回调ContinueWith可用。但条件允许的话,何不把回调的内容写在异步方法内部呢?

与C#异步编程是怎么回事(番外)相似的内容:

C#异步编程是怎么回事(番外)

在上一篇通信协议碰到了多线程,阻塞、非阻塞、锁、信号量...,会碰到很多问题。因此我感觉很有必要研究多线程与异步编程。 首先以一个例子开始 我说明一下这个例子。 这是一个演示异步编程的例子。 输入job [name],在一个同步的Main方法中,以一发即忘的方式调用异步方法StartJob()。 输

C#委托

目录C# 委托委托是什么?基本语法委托的常见用法总结引用 C# 委托 委托是什么? ** 委托定义一种类型,该类型封装一个或多个方法(一个或多个方法指向委托实例)。** 委托是一种指向方法的引用。它允许您将方法存储在变量中,并像调用普通方法一样调用它们。委托通常用于事件处理 和异步编程。 基本语法

CUDA C编程权威指南:2.1-CUDA编程模型

本文主要通过例子介绍了CUDA异构编程模型,需要说明的是Grid、Block和Thread都是逻辑结构,不是物理结构。实现例子代码参考文献[2],只需要把相应章节对应的CMakeLists.txt文件拷贝到CMake项目根目录下面即可运行。 1.Grid、Block和Thread间的关系 GPU中最

CUDA C编程权威指南:1-基于CUDA的异构并行计算

什么是CUDA?CUDA(Compute Unified Device Architecture,统一计算设备架构)是NVIDIA(英伟达)提出的并行计算架构,结合了CPU和GPU的优点,主要用来处理密集型及并行计算。什么是异构计算?这里的异构主要指的是主机端的CPU和设备端的GPU,CPU更擅长逻

C#异步有多少种实现方式?

前言 微信群里的一个提问引发的这个问题,有同学问:C#异步有多少种实现方式?想要知道C#异步有多少种实现方式,首先我们要知道.NET提供的执行异步操作的三种模式,然后再去了解C#异步实现的方式。 .NET异步编程模式 .NET 提供了执行异步操作的三种模式: 基于任务的异步模式 (TAP) ,该模式

多线程合集(三)---异步的那些事之自定义AsyncTaskMethodBuilder

引言 之前在上一篇文章中多线程合集(二) 异步的那些事,async和await原理抛析,我们从源码去分析了async和await如何运行,以及将编译后的IL代码写成了c#代码,以及实现自定义的Awaiter,自定义异步状态机同时将本系列的第一篇文章的自定义TaskScheduler和自定义的Awai

【C#异步】异步多线程的本质,上下文流转和同步

引言 net同僚对于async和await的话题真的是经久不衰,这段时间又看到了关于这方面的讨论,最终也没有得出什么结论,其实要弄懂这个东西,并没有那么复杂,简单的从本质上来讲,就是一句话,async 和await异步的本质就是状态机+线程环境上下文的流转,由状态机向前推进执行,上下文进行环境切换,

C# - 能否让 SortedSet.RemoveWhere 内传入的委托异步执行

若想充分利用 `RemoveWhere` 带来的性能优势,建议传入判断是否删除元素的委托内采取同步操作。若一定要在该委托内使用异步操作,可以采用本文中绕行的方法,但摈弃了 `RemoveWhere` 所带来的性能优势。

在C#中使用RabbitMQ做个简单的发送邮件小项目

在C#中使用RabbitMQ做个简单的发送邮件小项目 前言 好久没有做项目了,这次做一个发送邮件的小项目。发邮件是一个比较耗时的操作,之前在我的个人博客里面回复评论和友链申请是会通过发送邮件来通知对方的,不过当时只是简单的进行了异步操作。 那么这次来使用RabbitMQ去统一发送邮件,我的想法是通过

记一次RocketMQ消费非顺序消息引起的线上事故

应用场景 C端用户提交工单、工单创建完成之后、会发布一条工单创建完成的消息事件(异步消息)、MQ消费者收到消息之后、会通知各处理器处理该消息、各处理器处理完后都会发布一条将该工单写入搜索引擎的消息、最终该工单出现在搜索引擎、被工单处理人检索和处理。 事故异常体现 1、异常体现 从工单的流转记录发现、