NETCore中实现一个轻量无负担的极简任务调度ScheduleTask

  • NETCore中实现一个轻量无负担的极简任务调度ScheduleTask已关闭评论
  • 54 次浏览
  • A+
所属分类:.NET技术
摘要

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用

技术栈用到了:BackgroundServiceNCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

    public interface IScheduleTask     {         Task ExecuteAsync();     }     public abstract class ScheduleTask : IScheduleTask     {         public virtual Task ExecuteAsync()         {             return Task.CompletedTask;         }     } 

第二步定义特性标注任务执行周期等信的metadata

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]     public class ScheduleTaskAttribute(string cron) : Attribute     {         /// <summary>         /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron         /// 最小单位为分钟         /// </summary>         public string Cron { get; set; } = cron;         public string? Description { get; set; }         /// <summary>         /// 是否异步执行.默认false会阻塞接下来的同类任务         /// </summary>         public bool IsAsync { get; set; } = false;         /// <summary>         /// 是否初始化即启动,默认false         /// </summary>         public bool IsStartOnInit { get; set; } = false;     } 

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

    public interface IScheduler     {         /// <summary>         /// 判断当前的任务是否可以执行         /// </summary>         bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);     } 

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

    public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)     {         public Type ScheduleTaskType { get; set; } = scheduleTaskType;         public string Cron { get; set; } = cron;         public string? Description { get; set; }         public bool IsAsync { get; set; } = false;         public bool IsStartOnInit { get; set; } = false;     }     public interface IScheduleMetadataStore     {         /// <summary>         /// 获取所有ScheduleTaskMetadata         /// </summary>         Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();     } 

实现一个Configuration级别的Store

    internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore     {         const string Key = "BiwenQuickApi:Schedules";          public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()         {             var options = configuration.GetSection(Key).GetChildren();              if (options?.Any() is true)             {                 var metadatas = options.Select(x =>                 {                     var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);                     if (type is null)                         throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");                      return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)                     {                         Description = x[nameof(ConfigurationScheduleOption.Description)],                         IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),                         IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),                     };                 });                 return Task.FromResult(metadatas);             }             return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());         }     } 

然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088

    public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent     {         /// <summary>         /// 任务         /// </summary>         public IScheduleTask ScheduleTask { get; set; } = scheduleTask;         /// <summary>         /// 触发时间         /// </summary>         public DateTime EventTime { get; set; } = eventTime;     }     /// <summary>     /// 执行完成     /// </summary>     public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)     {         /// <summary>         /// 执行结束的时间         /// </summary>         public DateTime EndTime { get; set; } = endTime;     }     /// <summary>     /// 执行开始     /// </summary>     public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);     /// <summary>     /// 执行失败     /// </summary>     public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)     {         /// <summary>         /// 异常信息         /// </summary>         public Exception Exception { get; private set; } = exception;     } 

接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

    internal class SampleNCrontabScheduler : IScheduler     {         /// <summary>         /// 暂存上次执行时间         /// </summary>         private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();          public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)         {             var now = DateTime.Now;             var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);             if (!haveExcuteTime)             {                 var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);                 LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);                  //如果不是初始化启动,则不执行                 if (!scheduleMetadata.IsStartOnInit)                     return false;             }             if (now >= time)             {                 var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);                 //更新下次执行时间                 LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);                 return true;             }             return false;         }     } 

然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:

     internal class ScheduleBackgroundService : BackgroundService     {         private static readonly TimeSpan _pollingTime #if DEBUG           //轮询20s 测试环境下,方便测试。           = TimeSpan.FromSeconds(20); #endif #if !DEBUG          //轮询60s 正式环境下,考虑性能轮询时间延长到60s          = TimeSpan.FromSeconds(60); #endif         //心跳10s.         private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);         private readonly ILogger<ScheduleBackgroundService> _logger;         private readonly IServiceProvider _serviceProvider;         public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)         {             _logger = logger;             _serviceProvider = serviceProvider;         }         protected override async Task ExecuteAsync(CancellationToken stoppingToken)         {             while (!stoppingToken.IsCancellationRequested)             {                 var pollingDelay = Task.Delay(_pollingTime, stoppingToken);                 try                 {                     await RunAsync(stoppingToken);                 }                 catch (Exception ex)                 {                     //todo:                     _logger.LogError(ex.Message);                 }                 await WaitAsync(pollingDelay, stoppingToken);             }         }         private async Task RunAsync(CancellationToken stoppingToken)         {             using var scope = _serviceProvider.CreateScope();             var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();             if (tasks is null || !tasks.Any())             {                 return;             }             //调度器             var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();             async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)             {                 if (scheduler.CanRun(metadata, DateTime.Now))                 {                     var eventTime = DateTime.Now;                     //通知启动                     _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);                     try                     {                         if (metadata.IsAsync)                         {                             //异步执行                             _ = task.ExecuteAsync();                         }                         else                         {                             //同步执行                             await task.ExecuteAsync();                         }                         //执行完成                         _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);                     }                     catch (Exception ex)                     {                         _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);                     }                 }             };             //注解中的task             foreach (var task in tasks)             {                 if (stoppingToken.IsCancellationRequested)                 {                     break;                 }                 //标注的metadatas                 var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();                  if (!metadatas.Any())                 {                     continue;                 }                 foreach (var metadata in metadatas)                 {                     await DoTaskAsync(task, metadata);                 }             }             //store中的scheduler             var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();              //并行执行,提高性能             Parallel.ForEach(stores, async store =>             {                 if (stoppingToken.IsCancellationRequested)                 {                     return;                 }                 var metadatas = await store.GetAllAsync();                 if (metadatas is null || !metadatas.Any())                 {                     return;                 }                 foreach (var metadata in metadatas)                 {                     var attr = new ScheduleTaskAttribute(metadata.Cron)                     {                         Description = metadata.Description,                         IsAsync = metadata.IsAsync,                         IsStartOnInit = metadata.IsStartOnInit,                     };                      var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;                     if (task is null)                     {                         return;                     }                     await DoTaskAsync(task, attr);                 }             });         }          private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)         {             try             {                 await Task.Delay(_minIdleTime, stoppingToken);                 await pollingDelay;             }             catch (OperationCanceledException)             {             }         }     } 

最后收尾阶段我们老规矩扩展一下IServiceCollection:

        internal static IServiceCollection AddScheduleTask(this IServiceCollection services)         {             foreach (var task in ScheduleTasks)             {                 services.AddTransient(task);                 services.AddTransient(typeof(IScheduleTask), task);             }             //调度器             services.AddScheduler<SampleNCrontabScheduler>();             //配置文件Store: 	services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();             //BackgroundService            services.AddHostedService<ScheduleBackgroundService>();             return services;         }         /// <summary>         /// 注册调度器AddScheduler         /// </summary>         public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler         {             services.AddSingleton<IScheduler, T>();             return services;         }          /// <summary>         /// 注册ScheduleMetadataStore         /// </summary>         public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore         {             services.AddSingleton<IScheduleMetadataStore, T>();             return services;         } 

老规矩我们来测试一下:

    //通过特性标注的方式执行:     [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次     [ScheduleTask("0/3 * * * *")]//每3分钟执行一次     public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask     {         public async Task ExecuteAsync()         {             //执行5s             await Task.Delay(TimeSpan.FromSeconds(5));             logger.LogInformation("keep alive!");         }     } 	public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask     {         public Task ExecuteAsync()         {             logger.LogInformation("Demo Config Schedule Done!");             return Task.CompletedTask;         }     } 

通过配置文件的方式配置Store:

{   "BiwenQuickApi": {     "Schedules": [       {         "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",         "Cron": "0/5 * * * *",         "Description": "Every 5 mins",         "IsAsync": true,         "IsStartOnInit": false       },       {         "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",         "Cron": "0/10 * * * *",         "Description": "Every 10 mins",         "IsAsync": false,         "IsStartOnInit": true       }     ]   } } 

我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

    public class DemoStore : IScheduleMetadataStore     {         public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()         {             //模拟从数据库或配置文件中获取ScheduleTaskMetadata             IEnumerable<ScheduleTaskMetadata> metadatas =                 [                     new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))                     {                         Description="测试的Schedule"                     },                 ];             return Task.FromResult(metadatas);         }     } 	//然后注册这个Store: 	builder.Services.AddScheduleMetadataStore<DemoStore>(); 

所有的一切都大功告成,最后我们来跑一下Demo,成功了:
NETCore中实现一个轻量无负担的极简任务调度ScheduleTask

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

2024/05/16更新:
提供同一时间单一运行中的任务实现

/// <summary> /// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟. /// </summary> /// <param name="logger"></param> [ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]     public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask     {         public override Task OnAbort()         {             logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");             return Task.CompletedTask;         }          public override async Task ExecuteAsync()         {             var now = DateTime.Now;             //模拟一个耗时2分钟的任务             await Task.Delay(TimeSpan.FromMinutes(2));             logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");         }     } 

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling