【译】You probably should stop using a custom TaskScheduler

you,probably,should,stop,using,custom,taskscheduler · 浏览次数 : 0

小编点评

本文介绍了.NET的Task Scheduler及其在自定义实现中的应用。Task Scheduler是.NET 4.0引入的一个库,用于并行编程。本文将探讨Task Scheduler的基本概念、自定义实现以及如何避免死锁等问题。 1. 什么是Task Scheduler? Task Scheduler是.NET 4.0引入的一个库,用于并行编程。它提供了一种定义如何在运行时执行任务的策略。默认情况下,Task Scheduler使用ThreadPoolTaskScheduler来管理任务,但在某些情况下,可以使用自定义的Task Scheduler来实现特定的并发需求。 2. 自定义Task Scheduler 自定义Task Scheduler允许开发者在自己的应用程序中控制任务的执行顺序和并发性。例如,可以实现DedicatedThreadsTaskScheduler、LimitedConcurrencyLevelTaskScheduler等自定义Task Scheduler,以满足不同的并发需求。 3. 避免死锁 在自定义Task Scheduler中,需要注意避免死锁的问题。死锁通常发生在多个任务依赖于同一个Task Scheduler时。为了避免这种情况,可以在每个异步方法中使用ConfigureAwait(false),以确保任务能够在不同的Task Scheduler上独立运行。 4. 更好的并发控制工具 对于CPU密集型任务,可以考虑使用PLINQ或ActionBlock等高级工具。对于IO绑定和异步任务,可以使用Parallel.ForEachAsync或Polly.RateLimiting等工具来实现更明确的并发控制。 总之,Task Scheduler是一个强大的工具,可以帮助开发者实现复杂的并行和并发任务。然而,在使用Task Scheduler时,需要注意避免死锁和其他并发问题。在某些情况下,使用更高级别的并发控制工具可能是一个更好的选择。

正文

来自Sergey Tepliakov的 https://sergeyteplyakov.github.io/Blog/csharp/2024/06/14/Custom_Task_Scheduler.html

如果你不知道什么是TaskScheduler 或你的项目中没有它的自定义实现,你可能可以跳过这篇文章。但如果你不知道它是什么,但你的项目中确实有一两个,那么这篇文章绝对适合你。

让我们从基础开始。任务并行库(也称为TPL)引入于2010年的NET 4.0。当时它主要用于并行编程,而不是异步编程,因为异步编程在C#4和NET 4.0中不是一等公民。

例如,体现在TPL API中,Task.Factory.StartNew的入参为委托,返回voidT,而不是TaskTask<T>:

var task = Task.Factory.StartNew(() =>
	 {
		Console.WriteLine("Starting work...");
		Thread.Sleep(1000);
		Console.WriteLine("Done doing work.");
	});

Task.Factory.StartNew 有相当多的重载,其中一个需要 TaskScheduler .这是一种定义如何在运行时执行任务的策略。

默认情况下(如果未传递自定义 TaskScheduler 项,同时 TaskCreationOptions.LongRunning 未传递自定义项),则使用默认 TaskScheduler 。这是一个称为 ThreadPoolTaskScheduler 的内部类型,它使用 .NET 线程池来管理任务。(如果 传递TaskCreationOptions.LongRunning参数 给 Task.Factory.Startnew ,则使用专用线程来避免长时间使用线程池中的线程)。
与任何新技术一样,当 TPL 发布时,书呆子们很兴奋,并试图尽可能多地使用(和滥用)新技术。如果Microsoft给你一个可扩展的库,有些人认为这是一个好主意......你知道的。。。扩展它。
最常见的模式之一是并发限制,它使用固定数量的专用线程来确保您不会超额订阅 CPU:

public sealed class DedicatedThreadsTaskScheduler : TaskScheduler
{
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    private readonly List<Thread> _threads;

    public DedicatedThreadsTaskScheduler(int threadCount)
    {
        _threads = Enumerable.Range(0, threadCount).Select(i =>
        {
            var t = new Thread(() =>
            {
                foreach (var task in _tasks.GetConsumingEnumerable())
                {
                    TryExecuteTask(task);
                }
            })
            {
                IsBackground = true,
            };

            t.Start();
            return t;

        }).ToList();
    }

    protected override void QueueTask(Task task) => _tasks.Add(task);

    public override int MaximumConcurrencyLevel => _threads.Count;

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;

    protected override IEnumerable<Task> GetScheduledTasks() => _tasks;
}

此外还有很多其他实现执行相同的操作: DedicatedThreadTaskScheduler 、、 DedicatedThreadsTaskScheduler , LimitedConcurrencyLevelTaskScheduler 甚至 IOCompletionPortTaskScheduler 使用 IO 完成端口来限制并发性。

无论实现和幻想如何,它们都做同样的事情:它们最多允许同时执行给定数量的任务。下面是一个示例,说明我们如何使用它来强制最多同时运行 2 个任务:

var sw = Stopwatch.StartNew();
// Passing 2 as the threadCount to make sure we have at most 2 pending tasks.
var scheduler = new DedicatedThreadsTaskScheduler(threadCount: 2);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
    int num = i;
    var task = Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Starting {num}...");
        Thread.Sleep((num + 1) * 1000);
        Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Finishing {num}");
    }, CancellationToken.None, TaskCreationOptions.None, scheduler);
    
    tasks.Add(task);
}

await Task.WhenAll(tasks);

在本例中,我们在循环中创建任务,实际上它可能在某种请求中。下面是输出:

0.0154143: Starting 0...
0.0162219: Starting 1...
1.0262272: Finishing 0
1.0265169: Starting 2...
2.0224863: Finishing 1
2.0227441: Starting 3...
4.0417418: Finishing 2
4.041956: Starting 4...
6.0332304: Finishing 3
9.0453789: Finishing 4

正如你所看到的,一旦任务 0 完成,我们会立即安排任务 1 等,所以实际上我们在这里限制了并发性。

但是让我们做一点点小小的改动:

static async Task FooBarAsync()
{
    await Task.Run(() => 42);
}

...
var task = Task.Factory.StartNew(() =>
{
    Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Starting {num}...");
    Thread.Sleep((num + 1) * 1000);
    FooBarAsync().GetAwaiter().GetResult();
    Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Finishing {num}");
}, CancellationToken.None, TaskCreationOptions.None, scheduler);

输出为:

0.0176502: Starting 1...
0.0180366: Starting 0...

是的。死锁了!为什么?让我们更新一个示例以更好地查看问题:让我们跟踪当前 TaskScheduler 并将循环中创建的任务数减少到 1:

static void Trace(string message) => 
    Console.WriteLine($"{message}, TS: {TaskScheduler.Current.GetType().Name}");

static async Task FooBarAsync()
{
    Trace("Starting FooBarAsync");
    await Task.Run(() => 42);
    Trace("Finishing FooBarAsync");
}

static async Task Main(string[] args)
{
    var sw = Stopwatch.StartNew();
    var scheduler = new DedicatedThreadsTaskScheduler(threadCount: 2);
    var tasks = new List<Task>();
    for (int i = 0; i < 1; i++)
    {
        int num = i;
        var task = Task.Factory.StartNew(() =>
        {
            Trace($"{sw.Elapsed.TotalSeconds}: Starting {num}...");
            Thread.Sleep((num + 1) * 1000);
            FooBarAsync().GetAwaiter().GetResult();
            Trace($"{sw.Elapsed.TotalSeconds}: Finishing {num}...");
        }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        
        tasks.Add(task);
    }

	Trace("Done scheduling tasks...");
    await Task.WhenAll(tasks);
}

输出为:

0.018728: Starting 0..., TS: DedicatedThreadsTaskScheduler
Starting FooBarAsync, TS: DedicatedThreadsTaskScheduler
Finishing FooBarAsync, TS: DedicatedThreadsTaskScheduler
1.028004: Finishing 0..., TS: DedicatedThreadsTaskScheduler
Done scheduling tasks..., TS: ThreadPoolTaskScheduler

现在应该相对容易理解发生了什么以及为什么当我们尝试运行超过 2 个任务时会陷入死锁。请记住,异步方法中的每个步骤(关键字 await后的代码)本身就是一个任务,由任务调度程序逐个执行。默认情况下,任务调度程序是粘性的:如果TaskScheduler是在创建任务时提供的,那么所有后续的Task都将使用相同的TaskScheduler这意味着TaskScheduler贯穿所有异步方法中的 awaits。

在我们的例子中,这意味着当完成 FooAsync时 ,我们 DedicatedThreadsTaskScheduler 被调用来运行它的后续的Task(译者注:即await Task.Run(() => 42);)。但是它已经忙于运行所有任务,因此它无法在 FooAsync 末尾运行一段微不足道的代码。而且由于 FooAsync 无法完成,我们无法立即完成Task。导致死锁。

我们能做些什么来解决这个问题?

解决方案

有几种方法可以避免此问题:

1. Use ConfigureAwait(false)

static async Task FooBarAsync()
{
    Trace("Starting FooBarAsync");
    await Task.Run(() => 42);
    Trace("Finishing FooBarAsync");
}

我们在这里看到的问题与UI案例中的死锁非常相似,当任务被阻塞并且单个UI线程无法运行继续时。

我们可以通过确保每个异步方法都有 ConfigureAwait(false) 来避免这个问题。下面是具有以下 FooBarAsync 的实现时的输出。

static async Task FooBarAsync()
{
    Trace("Starting FooBarAsync");
    await Task.Run(() => 42).ConfigureAwait(false);
    Trace("Finishing FooBarAsync");
}
0.0397394: Starting 0..., TS: DedicatedThreadsTaskScheduler
Starting FooBarAsync, TS: DedicatedThreadsTaskScheduler
**Finishing FooBarAsync, TS: ThreadPoolTaskScheduler**
1.0876967: Finishing 0..., TS: DedicatedThreadsTaskScheduler

有人可能会说这是解决这个问题的正确方法,但我不同意。在我们的一个项目中,有一个实际案例,一个很难修复的库代码中存在阻塞异步方法。你可以通过使用分析器来确保你的代码遵循最佳实践,但期望每个人都遵循这些最佳实践是不切实际的。

(译者注:同样可以使用Fody来自动实现追加.ConfigureAwait(false);)

这里最大的问题是,这是一个不常见的情况。有许多后端系统在没有 ConfigureAwait(false) 的情况下工作得很好,因为团队没有任何带有同步上下文的 UI,而且任务调度程序的行为方式相同这一事实并不广为人知。

我只是觉得有更好的选择。

2. 以更明确的方式控制并发

我认为并发控制(又称速率限制)是应用程序非常重要的方面,重要的方面应该是明确的。

TaskScheduler 相当低级别的工具,我宁愿拥有更高级别的工具。如果工作是 CPU 密集型的,那么 PLINQ 或类似 ActionBlock TPL DataFlow 的东西可能是更好的选择。
 
如果工作主要是 IO 绑定和异步的,那么可以使用 Parallel.ForEachAsync 或 Polly.RateLimiting 基于 的 SemaphoreSlim 自定义帮助程序类。

结论

自定义TaskScheduler 只是一个工具,与任何工具一样,它可能被正确或错误地使用。如果您需要一个了解 UI 的调度程序,那TaskScheduler 适合您。但是,是否应该在应用中使用一个进行并发和并行控制?我会投反对票。如果团队可能在多年前有正当理由来使用,但请仔细检查这些理由今天是否存在。

是的,请记住,阻塞异步调用可能会以多种方式反噬,TaskScheduler 只是其中之一。因此,我建议对每个阻塞异步调用的地方进行备注,解释为什么您认为这样做既安全又有用。

与【译】You probably should stop using a custom TaskScheduler相似的内容:

【译】You probably should stop using a custom TaskScheduler

来自Sergey Tepliakov的 https://sergeyteplyakov.github.io/Blog/csharp/2024/06/14/Custom_Task_Scheduler.html 如果你不知道什么是TaskScheduler 或你的项目中没有它的自定义实现,你可能可以跳过

你做的 9 件事表明你不是专业的 Python 开发人员

本文转载自国外论坛 medium,原文地址: https://medium.com/navan-tech/7-java-features-you-might-not-have-heard-of-adee8166d942,由博主简译后给大家带来! Show me your code and I wil

【译】使 Visual Studio 更加可视化

任何 Web、桌面或移动开发人员都经常使用图像。你可以从 C#、HTML、XAML、CSS、C++、VB、TypeScript 甚至代码注释中引用它们。有些图像是本地的,有些存在于线上或网络共享中,而其他图像可能仅以 base64 编码字符串的形式存在。我们在代码中以多种方式引用它们,但总是作为字符

【译】在调试时轻松导航代码委托

我通常希望在单步执行之前或之后快速导航到由委托表示的底层代码,随着 Visual Studio 17.10 的最新更新,这非常容易。

【译】了解17.10 GA 中最新的 Git 工具特性

我们相信提高开发和团队协作的生产力可以帮助您产生更好的软件解决方案。这就是为什么 Visual Studio 版本控制团队发布了新特性,简化了内部循环和代码审查体验。

【译】Visual Studio 2022 - 17.10 性能增强

我们很高兴地宣布 Visual Studio 2022 的最新更新,它为您带来了 IDE 各个领域的一系列性能增强。在这篇博客中,我们将重点介绍17.10版本中一些最显著的改进,比如更快的 Windows Forms 设计器加载、更快的 Razor 着色、更快的解决方案加载以及更少的 dll 开销。

【译】VisualStudio.Extensibility 17.10:用 Diagnostics Explorer 调试您的扩展

VisualStudio. Extensibility 帮助您构建在主 IDE 进程之外运行的扩展,以提高性能和可靠性。它还提供了一个时尚而直观的基于 .NET 8 的 API 和全面且维护良好的文档,可以帮助您开发出色的扩展。

【译】向您介绍改版的 Visual Studio 资源管理器

随着最近 Visual Studio 的资源管理器的改进,开发人员将得到一种全新的享受!我们非常激动地宣布重新设计的 Visual Studio 资源管理器,相信我们,它将改变游戏规则。

【译】Visual Studio 17.10 发布了新版扩展管理器

从 Visual Studio 17.10 开始提供新的扩展管理器作为默认预览功能。我们已将基本功能简化为现代风格 UI,以帮助您发现新的扩展并管理已安装的扩展。

【译】通过出色的开发人员体验,将乐趣最大化,将痛苦最小化

有一条通往开发者幸福的道路——开发者可以更多地专注于编码,而不是运维;在那里他们可以以创造力的速度进行创新;他们可以快速发展,而不必担心管理他们的应用程序所依赖的基础设施。