自己动手实现一个轻量无负担的任务调度ScheduleTask

scheduletask · 浏览次数 : 0

小编点评

## Summary of the Code This code demonstrates the registration and execution of two types of scheduled tasks using the `IServiceCollection` in a .NET Core application. **Key Concepts:** * **Scheduled Tasks:** Tasks that execute on a scheduled basis. * **ScheduleTaskAttribute:** Custom attribute used to decorate tasks with scheduling information. * **ScheduleMetadataStore:** A custom interface that provides access to scheduled task metadata. * **DemoStore:** An example implementation of `ScheduleMetadataStore` that retrieves metadata from a simulated database. **Registration and Execution:** 1. The code registers two types of scheduled tasks: `KeepAlive` and `DemoConfigTask`. 2. `KeepAlive` task sleeps for 5 seconds and logs a message using `ILogger`. 3. `DemoConfigTask` publishes a completed task with a descriptive message. 4. Another `ScheduleMetadataStore` is registered using a custom `DemoStore` class. 5. The application uses `AddScheduleMetadataStore` to inject this store into the service collection. 6. Both scheduled tasks are registered using `AddScheduleTask` with appropriate configuration. **Benefits of Using Custom Store:** * **Separation of Concerns:** Decouples scheduling logic from the application code. * **Flexibility:** Allows you to use different data sources for storing scheduling metadata. **Additional Details:** * The code also defines a custom attribute `ScheduleTaskAttribute` and uses reflection to dynamically extract scheduling information from decorated tasks. * The `DemoStore` demonstrates a basic implementation of a `ScheduleMetadataStore`, demonstrating how to access and store scheduling metadata. **Overall, this code demonstrates a well-organized and flexible approach to scheduling tasks using custom attributes and a custom store interface.**

正文

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度

技术栈用到了:BackgroundServiceNCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

    public interface IScheduleTask
    {
        Task ExecuteAsync();
    }
    public abstract class ScheduleTask : IScheduleTask
    {
        public virtual Task ExecuteAsync()
        {
            return Task.CompletedTask;
        }
    }

第二步定义特性标注任务执行周期等信的metadata

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
    public class ScheduleTaskAttribute(string cron) : Attribute
    {
        /// <summary>
        /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
        /// 最小单位为分钟
        /// </summary>
        public string Cron { get; set; } = cron;
        public string? Description { get; set; }
        /// <summary>
        /// 是否异步执行.默认false会阻塞接下来的同类任务
        /// </summary>
        public bool IsAsync { get; set; } = false;
        /// <summary>
        /// 是否初始化即启动,默认false
        /// </summary>
        public bool IsStartOnInit { get; set; } = false;
    }

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

    public interface IScheduler
    {
        /// <summary>
        /// 判断当前的任务是否可以执行
        /// </summary>
        bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
    }

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

    public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
    {
        public Type ScheduleTaskType { get; set; } = scheduleTaskType;
        public string Cron { get; set; } = cron;
        public string? Description { get; set; }
        public bool IsAsync { get; set; } = false;
        public bool IsStartOnInit { get; set; } = false;
    }
    public interface IScheduleMetadataStore
    {
        /// <summary>
        /// 获取所有ScheduleTaskMetadata
        /// </summary>
        Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
    }

实现一个Configuration级别的Store

    internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
    {
        const string Key = "BiwenQuickApi:Schedules";

        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
        {
            var options = configuration.GetSection(Key).GetChildren();

            if (options?.Any() is true)
            {
                var metadatas = options.Select(x =>
                {
                    var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
                    if (type is null)
                        throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");

                    return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
                    {
                        Description = x[nameof(ConfigurationScheduleOption.Description)],
                        IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
                        IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
                    };
                });
                return Task.FromResult(metadatas);
            }
            return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
        }
    }

然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088

    public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
    {
        /// <summary>
        /// 任务
        /// </summary>
        public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
        /// <summary>
        /// 触发时间
        /// </summary>
        public DateTime EventTime { get; set; } = eventTime;
    }
    /// <summary>
    /// 执行完成
    /// </summary>
    public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
    {
        /// <summary>
        /// 执行结束的时间
        /// </summary>
        public DateTime EndTime { get; set; } = endTime;
    }
    /// <summary>
    /// 执行开始
    /// </summary>
    public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
    /// <summary>
    /// 执行失败
    /// </summary>
    public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
    {
        /// <summary>
        /// 异常信息
        /// </summary>
        public Exception Exception { get; private set; } = exception;
    }

接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

    internal class SampleNCrontabScheduler : IScheduler
    {
        /// <summary>
        /// 暂存上次执行时间
        /// </summary>
        private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();

        public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
        {
            var now = DateTime.Now;
            var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
            if (!haveExcuteTime)
            {
                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
                LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);

                //如果不是初始化启动,则不执行
                if (!scheduleMetadata.IsStartOnInit)
                    return false;
            }
            if (now >= time)
            {
                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
                //更新下次执行时间
                LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
                return true;
            }
            return false;
        }
    }

然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:


    internal class ScheduleBackgroundService : BackgroundService
    {
        private static readonly TimeSpan _pollingTime
#if DEBUG
          //轮询20s 测试环境下,方便测试。
          = TimeSpan.FromSeconds(20);
#endif
#if !DEBUG
         //轮询60s 正式环境下,考虑性能轮询时间延长到60s
         = TimeSpan.FromSeconds(60);
#endif
        //心跳10s.
        private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
        private readonly ILogger<ScheduleBackgroundService> _logger;
        private readonly IServiceProvider _serviceProvider;
        public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
        {
            _logger = logger;
            _serviceProvider = serviceProvider;
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
                try
                {
                    await RunAsync(stoppingToken);
                }
                catch (Exception ex)
                {
                    //todo:
                    _logger.LogError(ex.Message);
                }
                await WaitAsync(pollingDelay, stoppingToken);
            }
        }
        private async Task RunAsync(CancellationToken stoppingToken)
        {
            using var scope = _serviceProvider.CreateScope();
            var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
            if (tasks is null || !tasks.Any())
            {
                return;
            }
            //调度器
            var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
            async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
            {
                if (scheduler.CanRun(metadata, DateTime.Now))
                {
                    var eventTime = DateTime.Now;
                    //通知启动
                    _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
                    try
                    {
                        if (metadata.IsAsync)
                        {
                            //异步执行
                            _ = task.ExecuteAsync();
                        }
                        else
                        {
                            //同步执行
                            await task.ExecuteAsync();
                        }
                        //执行完成
                        _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
                    }
                    catch (Exception ex)
                    {
                        _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
                    }
                }
            };
            //注解中的task
            foreach (var task in tasks)
            {
                if (stoppingToken.IsCancellationRequested)
                {
                    break;
                }
                //标注的metadatas
                var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();

                if (!metadatas.Any())
                {
                    continue;
                }
                foreach (var metadata in metadatas)
                {
                    await DoTaskAsync(task, metadata);
                }
            }
            //store中的scheduler
            var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();

            //并行执行,提高性能
            Parallel.ForEach(stores, async store =>
            {
                if (stoppingToken.IsCancellationRequested)
                {
                    return;
                }
                var metadatas = await store.GetAllAsync();
                if (metadatas is null || !metadatas.Any())
                {
                    return;
                }
                foreach (var metadata in metadatas)
                {
                    var attr = new ScheduleTaskAttribute(metadata.Cron)
                    {
                        Description = metadata.Description,
                        IsAsync = metadata.IsAsync,
                        IsStartOnInit = metadata.IsStartOnInit,
                    };

                    var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
                    if (task is null)
                    {
                        return;
                    }
                    await DoTaskAsync(task, attr);
                }
            });
        }

        private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
        {
            try
            {
                await Task.Delay(_minIdleTime, stoppingToken);
                await pollingDelay;
            }
            catch (OperationCanceledException)
            {
            }
        }
    }

最后收尾阶段我们老规矩扩展一下IServiceCollection:

        internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
        {
            foreach (var task in ScheduleTasks)
            {
                services.AddTransient(task);
                services.AddTransient(typeof(IScheduleTask), task);
            }
            //调度器
            services.AddScheduler<SampleNCrontabScheduler>();
            //配置文件Store services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
            //BackgroundService
           services.AddHostedService<ScheduleBackgroundService>();
            return services;
        }
        /// <summary>
        /// 注册调度器AddScheduler
        /// </summary>
        public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
        {
            services.AddSingleton<IScheduler, T>();
            return services;
        }

        /// <summary>
        /// 注册ScheduleMetadataStore
        /// </summary>
        public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
        {
            services.AddSingleton<IScheduleMetadataStore, T>();
            return services;
        }

老规矩我们来测试一下:

    //通过特性标注的方式执行:
    [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
    [ScheduleTask("0/3 * * * *")]//每3分钟执行一次
    public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
    {
        public async Task ExecuteAsync()
        {
            //执行5s
            await Task.Delay(TimeSpan.FromSeconds(5));
            logger.LogInformation("keep alive!");
        }
    }
	public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
    {
        public Task ExecuteAsync()
        {
            logger.LogInformation("Demo Config Schedule Done!");
            return Task.CompletedTask;
        }
    }

通过配置文件的方式配置Store:

{
  "BiwenQuickApi": {
    "Schedules": [
      {
        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
        "Cron": "0/5 * * * *",
        "Description": "Every 5 mins",
        "IsAsync": true,
        "IsStartOnInit": false
      },
      {
        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
        "Cron": "0/10 * * * *",
        "Description": "Every 10 mins",
        "IsAsync": false,
        "IsStartOnInit": true
      }
    ]
  }
}

我们还可以自己实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

    public class DemoStore : IScheduleMetadataStore
    {
        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
        {
            //模拟从数据库或配置文件中获取ScheduleTaskMetadata
            IEnumerable<ScheduleTaskMetadata> metadatas =
                [
                    new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
                    {
                        Description="测试的Schedule"
                    },
                ];
            return Task.FromResult(metadatas);
        }
    }
	
	//然后注册这个Store:
	builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我们来跑一下Demo成功了:
image

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

与自己动手实现一个轻量无负担的任务调度ScheduleTask相似的内容:

自己动手实现一个轻量无负担的任务调度ScheduleTask

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel 这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度 技术栈用到了:BackgroundService和NCrontab库 第一步我们定义一

frida动态插桩初探

前言 近期碰到了分析app的需求,就学习了一下 frida的动态插桩技术。frida是一款轻量级HOOK框架,可用于多平台上,例如android、windows、ios等。frida分为两部分,服务端运行在目标机上,通过注入进程的方式来实现劫持应用函数,另一部分运行在我们自己的控制机上。frida上

5分钟教你搭建邮件服务器的实用指南

今天我写了一篇实用的文章,重点是教你如何免费搭建一个邮件服务器,这个服务器不仅可以用于发送邮件,还可以供我的待办机器人使用。一开始我试图找一些免费的 API 接口来实现这个功能,但遗憾的是,并没有找到合适的。对于程序员来说,能自己动手实现绝对是最好的选择,幸运的是,我有一台空闲的服务器可以利用。如果...

动手造轮子自己实现人工智能神经网络(ANN),解决鸢尾花分类问题Golang1.18实现

人工智能神经网络( Artificial Neural Network,又称为ANN)是一种由人工神经元组成的网络结构,神经网络结构是所有机器学习的基本结构,换句话说,无论是深度学习还是强化学习都是基于神经网络结构进行构建。关于人工神经元,请参见:人工智能机器学习底层原理剖析,人造神经元,您一定能看

Jquery实现复选框的选中和取消

复选框的选中与取消 我在网上看了好多关于这个问题的解答,好多都是一两个按钮的触发事件,有的甚至没有任何效果,经过自己的调试发现这个方法好用一点: 首先我在页面上添加了这样一个复选框 我的复选框是动态加载的,内容是从数据库加载的。 显示的效果图就是以上这样的。 代码 $("#inputc").clic

深入理解 C++ 中的多态与文件操作

C++ 多态 多态(Polymorphism)是面向对象编程(OOP)的核心概念之一,它允许对象在相同操作下表现出不同的行为。在 C++ 中,多态通常通过继承和虚函数来实现。 理解多态 想象一个场景,你有一个动物园,里面有各种动物,如猫、狗、鸟等。每个动物都有自己的叫声。使用面向对象编程,我们可以创

flutter系列之:做一个会飞的菜单

[toc] # 简介 flutter中自带了drawer组件,可以实现通用的菜单功能,那么有没有一种可能,我们可以通过自定义动画来实现一个别样的菜单呢? 答案是肯定的,一起来看看吧。 # 定义一个菜单项目 因为这里的主要目的是实现菜单的动画,所以这里的菜单比较简单,我们的menu是一个Statefu

带你动手做AI版的垃圾分类

摘要:本案例将使用YOLOX模型,实现一个简单的垃圾分类应用。 本文分享自华为云社区《ModelBox社区案例 - 使用YOLOX做垃圾分类》,作者:HWCloudAI。 1 ModelBox社区案例 - 使用YOLOX做垃圾分类 本案例将使用YOLOX模型,实现一个简单的垃圾分类应用,最终效果如下

动手实践丨基于ModelAtrs使用A2C算法制作登月器着陆小游戏

摘要:在本案例中,我们将展示如何基于A2C算法,训练一个LunarLander小游戏。 本文分享自华为云社区《使用A2C算法控制登月器着陆》,作者:HWCloudAI 。 LunarLander是一款控制类的小游戏,也是强化学习中常用的例子。游戏任务为控制登月器着陆,玩家通过操作登月器的主引擎和副引

1.7 完善自定位ShellCode后门

在之前的文章中,我们实现了一个正向的匿名管道`ShellCode`后门,为了保证文章的简洁易懂并没有增加针对调用函数的动态定位功能,此类方法在更换系统后则由于地址变化导致我们的后门无法正常使用,接下来将实现通过PEB获取`GetProcAddrees`函数地址,并根据该函数实现所需其他函数的地址自定位功能,通过枚举内存导出表的方式自动实现定位所需函数的动态地址,从而实现后门的通用性。