通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本

  • 通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本已关闭评论
  • 193 次浏览
  • A+
所属分类:.NET技术
摘要

     过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃肿,耦合性高,如果有分布式还需要搭建新的分布式环境,如果把任务调度做成组件服务,这个就完全满足了微服务的模块化,组件化,而下面谈的是在surging 中如何支持规则引擎自定义脚本。


简介

     过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃肿,耦合性高,如果有分布式还需要搭建新的分布式环境,如果把任务调度做成组件服务,这个就完全满足了微服务的模块化,组件化,而下面谈的是在surging 中如何支持规则引擎自定义脚本。

调度频率设置

       首先在开始之前,先看看如何通过脚本分配多种调度计划,先看下表:

方法 描述
EveryMinute() 每分钟执行一次任务
EveryFiveMinutes(); 每五分钟执行一次任务
EveryTenMinutes();  每十分钟执行一次任务
EveryThirtyMinutes() 每半小时执行一次任务
Hourly(); 每小时执行一次任务
HourlyAt(10) 每一个小时的第 10 分钟运行一次
Daily() 每到午夜执行一次任务
DailyAt("3:00") 在 3:00 执行一次任务
TwiceDaily(1, 3) 在 1:00 和 3:00 分别执行一次任务
Weekly() 每周执行一次任务
Monthly() 每月执行一次任务
MonthlyOn(4, "3:00") 在每个月的第四天的 3:00 执行一次任务
Quarterly() 每季度执行一次任务
Yearly() 每年执行一次任务
Timezone("utc") 设置utc时区

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

举个例子,在工作日每三秒在时间8:00-23:30内执行任务。脚本如下:

parser.TimeZone(""utc"")       .Weekdays()
.SecondAt(3)
.Between(""8:00"", ""23:30"")

 额外的限制条件列表如下:

方法 描述
Weekdays() 限制任务在工作日
Sundays() 限制任务在星期日
Mondays() 限制任务在星期一
Tuesdays() 限制任务在星期二
Wednesdays() 限制任务在星期三
Thursdays() 限制任务在星期四
Fridays() 限制任务在星期五
Saturdays() 限制任务在星期六
When( function(lastExecTime)) 限制任务基于一个script脚本返回为真的验证
Skip( function(lastExecTime)) 限制任务基于一个script脚本返回为假的验证

 

 

 

 

 

 

 

 

 

 

 

 

举个例子,在工作日每三秒在时间8:00-23:30内执行任务。如果设置When返回为true,skip返回false 就会执行,脚本如下:

parser.TimeZone(""utc"")        .When(function(lastExecTime){                 return true;             })        .Skip(              function(lastExecTime){                 return false;             })       .Weekdays()       .SecondAt(3)        .Between(""8:00"", ""23:30"")

然后在function 脚本中支持DateUtils对象,可以针对lastExecTime进行逻辑判断,比如是否是周末:DateUtils.IsWeekend(lastExecTime), 是否是今天DateUtils.IsToday(lastExecTime),代码如下:

 

parser.TimeZone(""utc"")        .When(function(lastExecTime){                return DateUtils.IsToday(lastExecTime);             })        .Skip(              function(lastExecTime){                 return DateUtils.IsWeekend(lastExecTime);             })       .Weekdays()       .SecondAt(3)        .Between(""8:00"", ""23:30"")

 

编写调度服务

surging微服务引擎是支持后台管理托管服务的,如果要基于BackgroundService编写任务调度,那服务就要继承BackgroundServiceBehavior,还要继承ISingleInstance以设置注入单例模式,

首先,创建接口服务,这样就可以远程添加任务,开启关闭服务了,代码如下:

   [ServiceBundle("Background/{Service}")]     public interface IWorkService : IServiceKey     {         Task<bool> AddWork(Message message);           Task StartAsync();          Task StopAsync();     }

然后创建业务领域服务,以下代码是通过规则引擎自定义脚本设置执行频率,并且可以设置execsize 以标识同时执行任务的大小,通过以下业务逻辑代码大家可以扩展支持持久化。

public class WorkService : BackgroundServiceBehavior, IWorkService, ISingleInstance     {         private readonly ILogger<WorkService> _logger;         private readonly Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>> _queue = new Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>>();         private readonly ConcurrentDictionary<string, DateTime> _keyValuePairs = new ConcurrentDictionary<string, DateTime>();         private readonly IServiceProxyProvider _serviceProxyProvider;         private AtomicLong _atomic=new AtomicLong(1);         private const int EXECSIZE = 1;         private CancellationToken _token;          public WorkService(ILogger<WorkService> logger, IServiceProxyProvider serviceProxyProvider)         {             _logger = logger;             _serviceProxyProvider = serviceProxyProvider;             /*   var script = @"parser                                .Weekdays().SecondAt(3).Between(""8:00"", ""22:00"")";*/             var script = @"parser                               .TimeZone(""utc"")                                .When(                               function(lastExecTime){                 return DateUtils.IsToday(lastExecTime);             }).Skip(              function(lastExecTime){                 return DateUtils.IsWeekend(lastExecTime);             }).Weekdays().SecondAt(3).Between(""8:00"", ""23:30"")";             var ruleWorkflow = GetSchedulerRuleWorkflow(script);             var messageId = Guid.NewGuid().ToString();             _keyValuePairs.AddOrUpdate(messageId, DateTime.Now, (key, value) => DateTime.Now);             _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(new Message() { MessageId= messageId,Config=new SchedulerConfig() {  IsPersistence=true} }, GetRuleEngine(ruleWorkflow), ruleWorkflow));          }          public  Task<bool> AddWork(Message message)         {             var ruleWorkflow = GetSchedulerRuleWorkflow(message.Config.Script);             _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);             _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, GetRuleEngine(ruleWorkflow), ruleWorkflow));             return Task.FromResult(true);         }          protected override async  Task ExecuteAsync(CancellationToken stoppingToken)         {             try             {                 _token = stoppingToken;                 _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);                  _queue.TryDequeue(out Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>? message);                 if (message != null)                 {                     var parser = await GetParser(message.Item3, message.Item2);                     await PayloadSubscribe(parser, message.Item1, message.Item2, message.Item3);                     _keyValuePairs.TryGetValue(message.Item1.MessageId, out DateTime dateTime);                     parser.Build(dateTime == DateTime.MinValue ? DateTime.Now : dateTime);                 }                 if (!_token.IsCancellationRequested && (message == null || _atomic.GetAndAdd(1) == EXECSIZE))                 {                     _atomic = new AtomicLong(1);                     await Task.Delay(1000, stoppingToken);                  }             }             catch (Exception ex){                 _logger.LogError("WorkService execute error, message:{message} ,trace info:{trace} ", ex.Message, ex.StackTrace);             }         }          public async Task StartAsync()         {             if (_token.IsCancellationRequested)             {                  await base.StartAsync(_token);             }         }          public async Task StopAsync()         {             if (!_token.IsCancellationRequested)             {                await  base.StopAsync(_token);             }         }          private async Task PayloadSubscribe(RulePipePayloadParser parser, Message message, RulesEngine.RulesEngine rulesEngine, SchedulerRuleWorkflow ruleWorkflow)         {             parser.HandlePayload().Subscribe(async (temperature) =>             {                 try                 {                     if (temperature)                     {                        await  ExecuteByPlanAsyn(message);                         _logger.LogInformation("Worker exec at: {time}", DateTimeOffset.Now);                      }                 }                 catch (Exception ex) { }                 finally                 {                     if (message.Config.IsPersistence || (!temperature && !message.Config.IsPersistence))                         _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, rulesEngine, ruleWorkflow));                  }             });         }          private async Task<bool> ExecuteByPlanAsyn(Message message)         {             var result = false;             var isExec = true;             try             {                 if (!string.IsNullOrEmpty(message.RoutePath))                 {                     var serviceResult = await _serviceProxyProvider.Invoke<object>(message.Parameters, message.RoutePath, message.ServiceKey);                     bool.TryParse(serviceResult?.ToString(), out result);                     isExec = true;                 }             }             catch { }             finally             {                 if (isExec && message.Config.IsPersistence)                     _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);                 else if (!message.Config.IsPersistence)                     _keyValuePairs.TryRemove(message.MessageId, out DateTime dateTime);             }             return result;         }          private async Task<RulePipePayloadParser> GetParser(SchedulerRuleWorkflow ruleWorkflow, RulesEngine.RulesEngine engine)         {             var payloadParser = new RulePipePayloadParser();             var ruleResult = await engine.ExecuteActionWorkflowAsync(ruleWorkflow.WorkflowName, ruleWorkflow.RuleName, new RuleParameter[] { new RuleParameter("parser", payloadParser) });             if (ruleResult.Exception != null && _logger.IsEnabled(LogLevel.Error))                 _logger.LogError(ruleResult.Exception, ruleResult.Exception.Message);             return payloadParser;         }          private RulesEngine.RulesEngine GetRuleEngine(SchedulerRuleWorkflow ruleWorkFlow)         {             var reSettingsWithCustomTypes = new ReSettings { CustomTypes = new Type[] { typeof(RulePipePayloadParser) } };             var result = new RulesEngine.RulesEngine(new Workflow[] { ruleWorkFlow.GetWorkflow() }, null, reSettingsWithCustomTypes);             return result;         }          private SchedulerRuleWorkflow GetSchedulerRuleWorkflow(string script)         {             var result = new SchedulerRuleWorkflow("1==1");             if (!string.IsNullOrEmpty(script))             {                 result = new SchedulerRuleWorkflow(script);             }             return result;         }     }

总结

因为工作繁忙,微服务平台暂时搁置,等公司基于surging 的物联网项目上线后,再投入时间研发,surging 一直开发中未曾放弃,也许你没看到的版本才是最强大的。之前的QQ群被封了,如果感兴趣可以加:744677125

开源地址:https://github.com/fanliang11/surging