- A+
至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用
技术栈用到了:BackgroundService
和NCrontab
库
第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:
public interface IScheduleTask { Task ExecuteAsync(); } public abstract class ScheduleTask : IScheduleTask { public virtual Task ExecuteAsync() { return Task.CompletedTask; } }
第二步定义特性标注任务执行周期等信的metadata
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)] public class ScheduleTaskAttribute(string cron) : Attribute { /// <summary> /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron /// 最小单位为分钟 /// </summary> public string Cron { get; set; } = cron; public string? Description { get; set; } /// <summary> /// 是否异步执行.默认false会阻塞接下来的同类任务 /// </summary> public bool IsAsync { get; set; } = false; /// <summary> /// 是否初始化即启动,默认false /// </summary> public bool IsStartOnInit { get; set; } = false; }
第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:
public interface IScheduler { /// <summary> /// 判断当前的任务是否可以执行 /// </summary> bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime); }
好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:
public class ScheduleTaskMetadata(Type scheduleTaskType, string cron) { public Type ScheduleTaskType { get; set; } = scheduleTaskType; public string Cron { get; set; } = cron; public string? Description { get; set; } public bool IsAsync { get; set; } = false; public bool IsStartOnInit { get; set; } = false; } public interface IScheduleMetadataStore { /// <summary> /// 获取所有ScheduleTaskMetadata /// </summary> Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync(); }
实现一个Configuration级别的Store
internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore { const string Key = "BiwenQuickApi:Schedules"; public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync() { var options = configuration.GetSection(Key).GetChildren(); if (options?.Any() is true) { var metadatas = options.Select(x => { var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!); if (type is null) throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!"); return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!) { Description = x[nameof(ConfigurationScheduleOption.Description)], IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!), IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!), }; }); return Task.FromResult(metadatas); } return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>()); } }
然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent
,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088
public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent { /// <summary> /// 任务 /// </summary> public IScheduleTask ScheduleTask { get; set; } = scheduleTask; /// <summary> /// 触发时间 /// </summary> public DateTime EventTime { get; set; } = eventTime; } /// <summary> /// 执行完成 /// </summary> public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime) { /// <summary> /// 执行结束的时间 /// </summary> public DateTime EndTime { get; set; } = endTime; } /// <summary> /// 执行开始 /// </summary> public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime); /// <summary> /// 执行失败 /// </summary> public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime) { /// <summary> /// 异常信息 /// </summary> public Exception Exception { get; private set; } = exception; }
接下来我们再实现基于NCrontab
的简易调度器,这个调度器主要是解析Cron
表达式判断传入时间是否可以执行ScheduleTask,具体的代码:
internal class SampleNCrontabScheduler : IScheduler { /// <summary> /// 暂存上次执行时间 /// </summary> private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new(); public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime) { var now = DateTime.Now; var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time); if (!haveExcuteTime) { var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime); LastRunTimes.TryAdd(scheduleMetadata, nextStartTime); //如果不是初始化启动,则不执行 if (!scheduleMetadata.IsStartOnInit) return false; } if (now >= time) { var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime); //更新下次执行时间 LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time); return true; } return false; } }
然后就是核心的BackgroundService
了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer
等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:
internal class ScheduleBackgroundService : BackgroundService { private static readonly TimeSpan _pollingTime #if DEBUG //轮询20s 测试环境下,方便测试。 = TimeSpan.FromSeconds(20); #endif #if !DEBUG //轮询60s 正式环境下,考虑性能轮询时间延长到60s = TimeSpan.FromSeconds(60); #endif //心跳10s. private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10); private readonly ILogger<ScheduleBackgroundService> _logger; private readonly IServiceProvider _serviceProvider; public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider) { _logger = logger; _serviceProvider = serviceProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var pollingDelay = Task.Delay(_pollingTime, stoppingToken); try { await RunAsync(stoppingToken); } catch (Exception ex) { //todo: _logger.LogError(ex.Message); } await WaitAsync(pollingDelay, stoppingToken); } } private async Task RunAsync(CancellationToken stoppingToken) { using var scope = _serviceProvider.CreateScope(); var tasks = scope.ServiceProvider.GetServices<IScheduleTask>(); if (tasks is null || !tasks.Any()) { return; } //调度器 var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>(); async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata) { if (scheduler.CanRun(metadata, DateTime.Now)) { var eventTime = DateTime.Now; //通知启动 _ = new TaskStartedEvent(task, eventTime).PublishAsync(default); try { if (metadata.IsAsync) { //异步执行 _ = task.ExecuteAsync(); } else { //同步执行 await task.ExecuteAsync(); } //执行完成 _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default); } catch (Exception ex) { _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default); } } }; //注解中的task foreach (var task in tasks) { if (stoppingToken.IsCancellationRequested) { break; } //标注的metadatas var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>(); if (!metadatas.Any()) { continue; } foreach (var metadata in metadatas) { await DoTaskAsync(task, metadata); } } //store中的scheduler var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray(); //并行执行,提高性能 Parallel.ForEach(stores, async store => { if (stoppingToken.IsCancellationRequested) { return; } var metadatas = await store.GetAllAsync(); if (metadatas is null || !metadatas.Any()) { return; } foreach (var metadata in metadatas) { var attr = new ScheduleTaskAttribute(metadata.Cron) { Description = metadata.Description, IsAsync = metadata.IsAsync, IsStartOnInit = metadata.IsStartOnInit, }; var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask; if (task is null) { return; } await DoTaskAsync(task, attr); } }); } private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken) { try { await Task.Delay(_minIdleTime, stoppingToken); await pollingDelay; } catch (OperationCanceledException) { } } }
最后收尾阶段我们老规矩扩展一下IServiceCollection
:
internal static IServiceCollection AddScheduleTask(this IServiceCollection services) { foreach (var task in ScheduleTasks) { services.AddTransient(task); services.AddTransient(typeof(IScheduleTask), task); } //调度器 services.AddScheduler<SampleNCrontabScheduler>(); //配置文件Store: services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>(); //BackgroundService services.AddHostedService<ScheduleBackgroundService>(); return services; } /// <summary> /// 注册调度器AddScheduler /// </summary> public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler { services.AddSingleton<IScheduler, T>(); return services; } /// <summary> /// 注册ScheduleMetadataStore /// </summary> public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore { services.AddSingleton<IScheduleMetadataStore, T>(); return services; }
老规矩我们来测试一下:
//通过特性标注的方式执行: [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次 [ScheduleTask("0/3 * * * *")]//每3分钟执行一次 public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask { public async Task ExecuteAsync() { //执行5s await Task.Delay(TimeSpan.FromSeconds(5)); logger.LogInformation("keep alive!"); } } public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask { public Task ExecuteAsync() { logger.LogInformation("Demo Config Schedule Done!"); return Task.CompletedTask; } }
通过配置文件的方式配置Store:
{ "BiwenQuickApi": { "Schedules": [ { "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb", "Cron": "0/5 * * * *", "Description": "Every 5 mins", "IsAsync": true, "IsStartOnInit": false }, { "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb", "Cron": "0/10 * * * *", "Description": "Every 10 mins", "IsAsync": false, "IsStartOnInit": true } ] } }
我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:
public class DemoStore : IScheduleMetadataStore { public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync() { //模拟从数据库或配置文件中获取ScheduleTaskMetadata IEnumerable<ScheduleTaskMetadata> metadatas = [ new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2)) { Description="测试的Schedule" }, ]; return Task.FromResult(metadatas); } } //然后注册这个Store: builder.Services.AddScheduleMetadataStore<DemoStore>();
所有的一切都大功告成,最后我们来跑一下Demo,成功了:
当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!
2024/05/16更新:
提供同一时间单一运行中的任务实现
/// <summary> /// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟. /// </summary> /// <param name="logger"></param> [ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)] public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask { public override Task OnAbort() { logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!"); return Task.CompletedTask; } public override async Task ExecuteAsync() { var now = DateTime.Now; //模拟一个耗时2分钟的任务 await Task.Delay(TimeSpan.FromMinutes(2)); logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!"); } }
源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling