关于 Task 简单梳理(C#)【并发编程系列_4】

关于,task,简单,梳理,c#,并发,编程,系列 · 浏览次数 : 828

小编点评

**代码生成说明** 代码生成说明包含以下内容: * 代码概述 * 代码示例 * 性能分析 * 优化建议 **代码示例** ```csharp using System.Threading.Tasks; public static class Main { Task[] tasks; public static void Main() { // 创建 7 个任务 tasks = new Task[7]; // 开始执行任务 foreach (var task in tasks) { task.Start(); } // 等待所有任务完成 Task.WaitAll(tasks); // 打印最终结果 Console.WriteLine("所有任务已完成。"); } } ``` **性能分析** 性能分析表明 Task 性能更强于 ThreadPool 性能。这是由于 Task 使用了本地队列,减少了线程之间的资源竞争。 **优化建议** * 使用 Task 的本地队列优化线程之间的资源竞争。 * 减少任务数量以降低线程池的全局队列大小。 * 使用 Task 的优化方法,如 Task.AwaitTaskResult 来减少 CPU 占用。

正文

〇、前言

Task 是微软在 .Net 4.0 时代推出来的,也是微软极力推荐的一种多线程的处理方式。

在 Task 之前有一个高效多线程操作类 ThreadPool,虽然线程池相对于 Thread,具有很多优势避免频繁创建和销毁线程等,但是线程池也有一些使用上的不便,比如不支持取消、完成、失败通知等,也不支持线程执行的先后顺序配置。

为了解决上述痛点,Task 诞生了。Task 就是站在巨人的肩膀上而生,它是基于 ThreadPool 封装。Task 的控制和扩展性很强,在线程的延续、阻塞、取消、超时等方面远胜于 ThreadPool。

本文将对 Task 进行一个详细的介绍。

一、任务如何创建和启动?

创建任务执行任务是可以分离的,也可以同时进行。如下代码有四种开启任务的方式:

  • 第一种:任务 t1 通过调用 Task 类构造函数进行实例化,但仅在任务 t2 启动后调用其 Start() 方法启动。【创建+未启动】
  • 第二种:任务 t2 通过调用 TaskFactory.StartNew(Action<Object>, Object) 方法在单个方法调用中实例化和启动。【创建+启动】
  • 第三种:任务 t3 通过调用 Run(Action) 方法在单个方法调用中实例化和启动。【创建+启动】
  • 第四种:任务 t4 通过调用 RunSynchronously() 方法在主线程上同步执行。【创建+未启动】
static void Main(string[] args)
{
    // 用于异步调用的委托函数,接受类型为 Object 的参数
    Action<object> action = (object obj) =>
    {
        // Task.CurrentId :任务 ID
        // Thread.CurrentThread.ManagedThreadId :线程 ID
        Console.WriteLine($"Task={Task.CurrentId}, obj={obj}, Thread={Thread.CurrentThread.ManagedThreadId}");
        // throw new Exception();
    };

    // 【第一种】创建一个就绪,但【未启动】的任务,需要在后文通过 t1.Start() 启动
    Task t1 = new Task(action, "甲"); // alpha:初始

    // 【第二种】【创建并启动】一个任务
    Task t2 = Task.Factory.StartNew(action, "乙");
    // 占用主线程,等待任务 t2 完成
    t2.Wait();

    // 启动第一个任务 t1
    t1.Start();
    Console.WriteLine($"t1 已启动  (主线程 = {Thread.CurrentThread.ManagedThreadId})");
    // 通过 Wait() 占用主线程,等待 t1 执行完毕
    t1.Wait();

    // 【第三种】通过 Task.Run() 【创建并启动】一个任务
    string taskData = "丙";
    Task t3 = Task.Run(() =>
    {
        Console.WriteLine($"Task={Task.CurrentId}, obj={taskData}, Thread={Thread.CurrentThread.ManagedThreadId}");
    });
    // 通过 Wait() 占用主线程,等待 t3 执行完毕
    t3.Wait();

    // 【第四种】创建一个就绪,但【未启动】的任务 t4
    Task t4 = new Task(action, "丁");
    // Synchronously:同步的
    // 开启同步任务 t4,在主线程上运行
    t4.RunSynchronously();
    // t4 是以同步的方式运行的,此时的 Wait() 可以捕捉到异常
    t4.Wait();
    Console.ReadLine();
}

如下图输出结果,最先开启的 t2,由于是工厂中启动的,所以不占用主线程运行。Task.Run() 同样是非主线程运行,但它并未新开线程,而是直接用了 t2 执行的线程。

线程编号为 1 的是主线程,t1 是主线程最先创建的,所以直接由主线程运行。t4 是在同步执行的任务,因此也是主线程来执行。

  

二、等待一个或多个任务

用于等待任务的方法有很多个,如下:

Wait() task1.Wait() 单线程等待
WaitAll() Task.WaitAll(tasks) 等待任务集合 tasks 中的全部任务完成
WaitAny()  int index = Task.WaitAny(tasks) 等待任一任务完成,并返回这一任务的编号
WhenAll() Task t = Task.WhenAll(tasks) 返回一个新的任务,这个任务的完成状态在【tasks 集合中全部任务都完成时】完成
WhenAny() Task t = Task.WhenAny(tasks) 返回在任务集合 tasks 中第一个执行完成的任务对象

下面几个示例来实操下。

2.1 Wait()

对于 Wait() 单线程等待,没啥好说的,看代码:

static void Main(string[] args)
{
    // 创建并执行一个任务执行匿名函数
    Task taskA = Task.Run(() => Thread.Sleep(2000));
    Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: WaitingToRun
    try
    {
        taskA.Wait(1000); // 主线程等待任务 1s 此时任务尚未完成
        Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: Running
        taskA.Wait(); // 线程等待任务 taskA 完成
        Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: RanToCompletion
    }
    catch (AggregateException)
    {
        Console.WriteLine("Exception in taskA.");
    }
}

2.2 Wait(Int32, CancellationToken)  支持手动取消

关于 Wait(Int32, CancellationToken) 任务可手动取消的重载。在任务完成之前,超时或调用了 Cancel() 方法,等待终止。

如下示例,一个线程一个任务,线程中将 CabcellationTokenSource 的实例 cts 取消掉,导致后续任务等待时调用 cts.Token 导致异常 OperationCanceledException 的发生。

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    Thread thread = new Thread(CancelToken); // 新开一个线程执行方法:CancelToken()
    thread.Start(cts);

    Task t = Task.Run(() => // 新增一个任务执行匿名函数
    {
        Task.Delay(5000).Wait(); // 延迟等待 5s
        Console.WriteLine("Task ended delay...");
    });
    try
    {
        Console.WriteLine($"About to wait completion of task {t.Id}"); // 以上两个操作都有延迟,所以此处消息先打印
        // 等待任务 t 1.51s,保证线程已执行完成,就是保证 CancellationTokenSource 已执行过取消操作
        // 由于 cts 已经取消,因此次数就抛异常:OperationCanceledException
        bool result = t.Wait(1510, cts.Token); // 后边代码就不再执行,直接跳到 catch
        Console.WriteLine($"Wait completed normally: {result}");
        Console.WriteLine($"The task status:  {t.Status}");
    }
    catch (OperationCanceledException e)
    {
        Console.WriteLine($"{e.GetType().Name}: The wait has been canceled.");
        Console.WriteLine($"Task status:{t.Status}"); // 此时程序运行 1.5s 多,任务 t 还在等待,因此状态是 Running
        Thread.Sleep(4000); // 4s + 1.5s > 5s 此时任务 t 已经执行完成,状态为 RanToCompletion 
        Console.WriteLine("After sleeping, the task status:  {t.Status}");
        cts.Dispose();
    }
    Console.ReadLine();
}

private static void CancelToken(Object obj)
{
    Thread.Sleep(1500); // 延迟 1.5s
    Console.WriteLine($"Canceling the cancellation token from thread {Thread.CurrentThread.ManagedThreadId}...");
    CancellationTokenSource source = obj as CancellationTokenSource;
    if (source != null) 
        source.Cancel(); // 将 CancellationTokenSource 的实例执行取消
}

  

2.3 WaitAll()

 等待一组任务全部完成,无论是否抛异常。AggregateException 将会收集全部异常信息,可以通过遍历获取每一个异常详情。

如下代码,新建是个任务组成任务组 tasks,其中 2~5 线程手动抛异常,最后通过遍历 AggregateException aex 记录全部异常。

static void Main(string[] args)
{
    var tasks = new List<Task<int>>();
    // 创建一个委托,用于任务执行,并记录每个任务信息
    Func<object, int> action = (object obj) =>
    {
        int i = (int)obj;
        // 让每次的 TickCount 不同(系统开始运行的毫秒数)
        Thread.Sleep(i * 1000);
        if (2 <= i && i <= 5) // 从第 2 到 5 个任务都抛异常
        {
            throw new InvalidOperationException("SIMULATED EXCEPTION");
        }
        int tickCount = Environment.TickCount; // 获取系统开始运行的毫秒数
        Console.WriteLine($"Task={Task.CurrentId}, i={i}, TickCount={tickCount}, Thread={Thread.CurrentThread.ManagedThreadId}");
        return tickCount;
    };
    // 连续创建 10 个任务
    for (int i = 0; i < 10; i++)
    {
        int index = i;
        tasks.Add(Task<int>.Factory.StartNew(action, index)); // 后台线程
    }
    try
    {
        // WaitAll() 等待全部任务完成
        Task.WaitAll(tasks.ToArray());
        // 由于线程中手动抛出了异常,因此这个消息将无法打印在控制台
        Console.WriteLine("WaitAll() has not thrown exceptions. THIS WAS NOT EXPECTED.");
    }
    catch (AggregateException aex) // AggregateException 异常中包含 2~5 四个异常
    {
        Console.WriteLine("\nThe following exceptions have been thrown by WaitAll(): (THIS WAS EXPECTED)");
        Console.WriteLine($"\ne.InnerExceptions.Count:{aex.InnerExceptions.Count}");
        for (int j = 0; j < aex.InnerExceptions.Count; j++) // aex.InnerExceptions.Count == 4
        {
            Console.WriteLine("\n-------------------------------------------------\n{0}", aex.InnerExceptions[j].ToString());
        }
    }
    Console.ReadLine();
}

   

2.4 WaitAny()

 等待一组任务中的任一任务完成,然后返回第一个执行完成任务的序号,可通过tasks[index].Id取得任务 ID。

如下示例,每个任务都有延迟,当第一个任务完成时,遍历打印出其他全部任务的状态:

static void Main(string[] args)
{
    Task[] tasks = new Task[5];
    for (int ctr = 0; ctr <= 4; ctr++)
    {
        int factor = ctr; // 重新声明一个变量
        tasks[ctr] = Task.Run(() => Thread.Sleep(factor * 250 + 50));
    }
    int index = Task.WaitAny(tasks); // 等待任一任务结束
    Console.WriteLine($"任务 #{tasks[index].Id} 已完成。");
    Console.WriteLine("\n当前各个任务的状态:");
    foreach (var t in tasks)
        Console.WriteLine($"   Task {t.Id}: {t.Status}");
    Console.ReadLine();
}

   

参考:https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.wait?view=net-7.0

三、延续任务 Task.ContinueWith()

3.1 一个简单的示例

如下代码,首先创建一个耗时的任务 task 并启动,此时也不影响主线程的运行。然后通过task.ContinueWith()在第一个任务执行完成后,执行其中的匿名函数。

static void Main(string[] args)
{
    // 创建一个任务
    Task<int> task = new Task<int>(() =>
    {
        int sum = 0;
        Console.WriteLine($"使用 Task 执行异步操作,当前线程 {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(2000);
        for (int i = 0; i < 100; i++)
        {
            sum += i;
        }
        return sum;
    });
    // 启动任务
    task.Start();
    // 主线程在此处可以执行其他处理
    Console.WriteLine($"1 主线程 {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1000);

    //任务完成时执行处理。
    Task cwt = task.ContinueWith(t =>
    {
        Console.WriteLine($"任务完成后的执行结果:{t.Result} 当前线程 {Thread.CurrentThread.ManagedThreadId}");
    });
    task.Wait();
    cwt.Wait();
    Console.WriteLine($"2 主线程 {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadLine();
}

   

详情可参考:https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.continuewith?view=net-7.0

3.2 任务的并行与串行

ContinueWith、WaitAll 当这两者结合起来,我们就可以处理复杂一点的东西。比如,现在有 7 个任务,其中 t1 需要串行,t2-t3 可以并行,t4 需要串行,t5-t6 并行,t7 串行。逻辑如下图:

  

public static void Main(string[] args)
{
    ConcurrentStack<int> stack = new ConcurrentStack<int>(); // ConcurrentStack:线程安全的后进先出(LIFO:LastIn-FirstOut)集合
    ConcurrentBag<int> bag = new ConcurrentBag<int>(); // ConcurrentBag:线程安全的无序集合
    // t1先串行
    var t1 = Task.Factory.StartNew(() =>
    {
        stack.Push(1);
        stack.Push(2);
    });

    // t1.ContinueWith() t1 之后,t2、t3并行执行
    var t2 = t1.ContinueWith(t =>
    {
        int result;
        stack.TryPop(out result);
    });
    // t2,t3并行执行
    var t3 = t1.ContinueWith(t =>
    {
        int result;
        stack.TryPop(out result);
    });
    // 等待 t2、t3 执行完
    Task.WaitAll(t2, t3);

    //t4串行执行
    var t4 = Task.Factory.StartNew(() =>
    {
        stack.Push(1);
        stack.Push(2);
    });

    // t5、t6 并行执行
    var t5 = t4.ContinueWith(t =>
    {
        int result;
        stack.TryPop(out result);
    });
    // t5、t6 并行执行
    var t6 = t4.ContinueWith(t =>
    {
        int result;
        // 只弹出,不移除
        stack.TryPeek(out result);
    });
    // 临界区:等待 t5、t6 执行完
    Task.WaitAll(t5, t6);

    // t7 串行执行
    var t7 = Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"当前集合元素个数:{stack.Count}"); // 当前集合元素个数:1
    });
    Console.ReadLine();
}

四、为什么性能方面 Task > ThreadPool ?

ThreadPool 使用的是线程池全局队列,全局队列中的线程依旧会存在竞争共享资源的情况,从而影响性能。

Task 基于 ThreadPool 实现,相当于 ThreadPoll 的优化版。它不再使用线程池的全局队列,而是使用的本地队列,使线程之间的资源竞争减少。同时 Task 提 供了丰富的 API 来管理线程、控制。

但是相对 Thread 和 ThreadPool 的内存方面的消耗,Task 依赖于 CPU 对于多核的 CPU 性能就更强了,单核的 CPU 三者的性能没什么差别。推荐参考:关于线程池和队列

参考: https://www.cnblogs.com/huangxincheng/archive/2012/04/03/2430638.html

https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task?view=net-7.0

https://www.cnblogs.com/zhaoshujie/p/11082753.html

与关于 Task 简单梳理(C#)【并发编程系列_4】相似的内容:

关于 Task 简单梳理(C#)【并发编程系列_4】

Task 是微软在 .Net 4.0 时代推出来的,也是微软极力推荐的一种多线程的处理方式,本文简单梳理下它的用法。

es针对nested类型数据无法进行过滤查询的问题记录

问题描述 es中存在有一个名为task_data_1的索引,其字段映射关系如下所示: { "task_data_1" : { "mappings" : { "dynamic_templates" : [ { "dates" : { "match_mapping_type" : "date", "ma

推荐系统:精排多目标融合与超参数学习方法

粗排/精排的个性化多任务学习模型,能预估20多个不同的预估值,如点击率、有效播放率、播放时长、点赞率、关注率等,那如何用它来排序呢?从多任务学习到多目标排序,中间有一个过渡,即如何把这些预估值融合成一个单一的排序分,最后实现多目标精排。这也就引入了本文要介绍的正题:多目标融合(multi-task ...

关于面向对象的方法并行执行的问题

LabVIEW的从同一个类实例化的多个对象如何执行各自的方法呢? 这几天跟同事讨论到LabVIEW的面向对象编程中,如果我设计的一个类有一个方法比较耗时,那么当我实例化多个对象时,那么这个耗时的方法是怎么执行的呢?是各自并行执行还是,必须等某一个对象的方法调用完,接下来调用第二个对象的该方法呢? 接

关于ComfyUI的一些Tips

关于ComfyUI的一些Tips 前言: 最近发的ComfyUI相关文章节奏不知道会不会很快,在创作的时候没有考虑很多,想着把自己的知识分享出去。后台也看到很多私信,有各种各样的问题,这是我欠缺考虑了,今天这篇文章呢,根据私信的问题我大致整理了一下,给大家一些小tips。 目录 一、将 ComfyU

关于领域驱动设计,大家都理解错了

翻遍整个互联网,我发现,关于领域驱动设计,大家都**理解错了**。 今天,我们尝试通过一篇文章的篇幅,给大家展示一个完全不同的视角,把“领域驱动设计”这六个字解释清楚。 ## 领域驱动设计学习资料现状 领域驱动设计的概念提出已经有20年的时间了,整个互联网充斥着大量书籍、文章和视频教程,这里我列举几

关于docker-compose up -d 出现超时情况处理

由于要搭建一个ctf平台,用docker一键搭建是出现超时情况 用了很多办法,换源,等之类的一样没办法,似乎它就是只能用官方那个一样很怪。 只能用一种笨办法来处理了,一个个pull。 打个比如: 打开相对应docker-compose.yml文件 可以看到image就是需要去下载的。那么此时你就可以

关于 KL 散度和变分推断的 ELBO

ELBO 用于最小化 q(z|s) 和 p(z|s) 的 KL 散度,变成最大化 p(x|z) 的 log likelihood + 最小化 q(z|s) 和先验 p(z) 的 KL 散度。

关于面试被面试官暴怼:“几年研究生白读” 的前因后果

中午一个网友来信说自己和面试官干起来了,看完他的描述真是苦笑不得,这年头是怎么了,最近互联网CS消息满天飞,怎么连面试官都SB起来了呢? 大概是这样的:这位网友面试时被问及了Serializable接口的底层实现原理,因为这是一个标识性的空接口,大部分同学在学习时都秉持着会用就行(说实话,Build

关于vue中image控件,onload事件里,event.target 为null的奇怪问题探讨

废话不多说(主要文笔比较差),直接上代码 一个简单的demo,如下 vue代码 imgLoaded(e) { deb