- A+
所属分类:.NET技术
前言
在上一篇文章【基于ASP.NET ZERO,开发SaaS版供应链管理系统】中有提到对Webhook功能的扩展改造,本文详细介绍一下具体过程。
Webhook功能操作说明,请参见此文档链接:Webhook数据推送
Webhook功能发布日期:
- ASP.NET Boilerplate(以下简称ABP)在v5.2(2020-02-18)版本中发布了Webhook功能,详细说明,请参见:官方帮助链接;
- ASP.NET ZERO(以下简称ZERO)在v8.2.0(2020-02-20)版本中发布了Webhook功能;
- 我们系统是在2021年4月完成了对Webhook功能的改造:内部接口(用户自行设定接口地址的)、第三方接口(微信内部群、钉钉群、聚水潭API等)。
1、Webhook定义
- 为了区分内部接口与第三方接口,在第三方接口名称前统一附加特定前缀,如:Third.WX.XXX、Third.DD.XXX等;
- 添加定义条目时候设定对应的特性(featureDependency),基于特性功能对不同租户显示或者隐藏定义的条目。
public class AppWebhookDefinitionProvider : WebhookDefinitionProvider { public override void SetWebhooks(IWebhookDefinitionContext context) { //物料档案 - 全部可见 context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Created)); context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Updated)); context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Deleted)); //生产订单 - 生产管理可见 var featureC = new SimpleFeatureDependency("SCM.C"); context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Created, featureDependency: featureC)); context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Updated, featureDependency: featureC)); context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Deleted, featureDependency: featureC)); context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_MRP_Data, featureDependency: featureC)); //... } }
- 在CoreModule中添加Webhook定义,并设定参数选项:
public class SCMCoreModule : AbpModule { public override void PreInitialize() { Configuration.Webhooks.Providers.Add<AppWebhookDefinitionProvider>(); Configuration.Webhooks.TimeoutDuration = TimeSpan.FromMinutes(1); Configuration.Webhooks.IsAutomaticSubscriptionDeactivationEnabled = true; Configuration.Webhooks.MaxSendAttemptCount = 3; Configuration.Webhooks.MaxConsecutiveFailCountBeforeDeactivateSubscription = 10; //... } //... }
2、Webhook订阅
- 前端用户创建Webhook订阅记录(WebhookUri、Webhooks、Headers等),之后传递到后端API;
- 后端API通过WebhookSubscriptionManager添加保存WebhookSubscription(Webhook订阅):
[AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription)] public class WebhookSubscriptionAppService : SCMAppServiceBase, IWebhookSubscriptionAppService { //... [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription_Create)] public async Task AddSubscription(WebhookSubscription subscription) { subscription.TenantId = AbpSession.TenantId; await _webHookSubscriptionManager.AddOrUpdateSubscriptionAsync(subscription); } //... }
3、Webhook发布(数据推送)
监测实体事件(CreatedEvent、UpdatedEvent、DeletedEvent)数据,按租户用户创建的Webhook订阅,推送数据:
public class T11071001Syncronizer : IEventHandler<EntityCreatedEventData<T11071001>>, IEventHandler<EntityUpdatedEventData<T11071001>>, IEventHandler<EntityDeletedEventData<T11071001>>, ITransientDependency { private readonly IAppWebhookPublisher _appWebhookPublisher; public T11071001Syncronizer(IAppWebhookPublisher appWebhookPublisher) { _appWebhookPublisher = appWebhookPublisher; } public void HandleEvent(EntityCreatedEventData<T11071001> eventData) { DoWebhook("N", eventData.Entity); } public void HandleEvent(EntityUpdatedEventData<T11071001> eventData) { DoWebhook("U", eventData.Entity); } public void HandleEvent(EntityDeletedEventData<T11071001> eventData) { int? tenantId = eventData.Entity.TenantId; string whName = AppWebHookNames.T11071001_Deleted; var subscriptions = _appWebhookPublisher.GetSubscriptions(tenantId, whName); if (subscriptions == null) { return; } _appWebhookPublisher.PublishWebhookUOW(whName, eventData.Entity, tenantId, subscriptions); } }
- DoWebhook()方法:基于具体的订阅(内部接口、第三方接口等)推送对应的内容:
private void DoWebhook(string nu, T11071001 entity) { int? tenantId = entity.TenantId; var whCache = _appWebhookPublisher.GetWebhookCache(tenantId); if (whCache.Count == 0) { return; } string whName = nu == "N" ? AppWebHookNames.T11071001_Created : AppWebHookNames.T11071001_Updated; string whNameWX = AppWebHookNames.WX_T11071001_Created; string whNameDD = AppWebHookNames.DD_T11071001_Created; bool isWH = whCache.Names.ContainsKey(whName); bool isWX = whCache.Names.ContainsKey(whNameWX); bool isDD = whCache.Names.ContainsKey(whNameDD); if (!(isWH || isWX || isDD)) { return; } var data = ObjectMapper.Map<T11071001WebhookDto>(entity); //内部接口 if (isWH) { _appWebhookPublisher.PublishWebhookUOW(whName, data, tenantId, whCache.Names[whName], false); } //企业微信内部群 if (isWX) { var wxData = new WxTCardWebhookDto { template_card = GetWxTCard(data, tenantId, nu) }; _appWebhookPublisher.PublishWebhookUOW(whNameWX, wxData, tenantId, whCache.Names[whNameWX], true); } //钉钉内部群 if (isDD) { var title = GetNUTitle(nu, L(T)); var mdText = GetNewMarkdown(data, title); var ddData = new DdMarkdownWebhookDto { markdown = new DdMarkdownContentDto { title = title, text = mdText } }; _appWebhookPublisher.PublishWebhookUOW(whNameDD, ddData, tenantId, whCache.Names[whNameDD], true); } }
- GetWebhookCache()方法:实现按租户缓存Webhook订阅的数据:
public SCMWebhookCacheItem GetWebhookCache(int? tenantId) { return SetAndGetCache(tenantId); } private SCMWebhookCacheItem SetAndGetCache(int? tenantId, string keyName = "SubscriptionCount") { int tid = tenantId ?? 0; var cacheKey = $"{keyName}-{tid}"; return _cacheManager.GetSCMWebhookCache().Get(cacheKey, () => { int count = 0; var names = new Dictionary<string, List<WebhookSubscription>>(); UnitOfWorkManager.WithUnitOfWork(() => { using (UnitOfWorkManager.Current.SetTenantId(tenantId)) { if (_featureChecker.IsEnabled(tid, "SCM.H")) //Feature核查 { var items = _webhookSubscriptionRepository.GetAllList(e => e.TenantId == tenantId && e.IsActive == true); count = items.Count; foreach (var item in items) { if (string.IsNullOrWhiteSpace(item.Webhooks)) { continue; } var whNames = JsonHelper.DeserializeObject<string[]>(item.Webhooks); if (whNames == null) { continue; } foreach (string whName in whNames) { if (names.ContainsKey(whName)) { names[whName].Add(item.ToWebhookSubscription()); } else { names.Add(whName, new List<WebhookSubscription> { item.ToWebhookSubscription() }); } } } } } }); return new SCMWebhookCacheItem(count, names); }); }
- PublishWebhookUOW()方法:替换ABP中WebHookPublisher的默认实现,直接按传入的订阅,通过WebhookSenderJob推送数据:
public void PublishWebhookUOW(string webHookName, object data, int? tenantId, List<WebhookSubscription> webhookSubscriptions = null, bool sendExactSameData = false) { UnitOfWorkManager.WithUnitOfWork(() => { using (UnitOfWorkManager.Current.SetTenantId(tenantId)) { Publish(webHookName, data, tenantId, webhookSubscriptions, sendExactSameData); } }); } private void Publish(string webhookName, object data, int? tenantId, List<WebhookSubscription> webhookSubscriptions, bool sendExactSameData = false) { if (string.IsNullOrWhiteSpace(webhookName)) { return; } //若无直接传入订阅则按webhookName查询 webhookSubscriptions ??= _webhookSubscriptionRepository.GetAllList(subscriptionInfo => subscriptionInfo.TenantId == tenantId && subscriptionInfo.IsActive && subscriptionInfo.Webhooks.Contains(""" + webhookName + """) ).Select(subscriptionInfo => subscriptionInfo.ToWebhookSubscription()).ToList(); if (webhookSubscriptions.IsNullOrEmpty()) { return; } var webhookInfo = SaveAndGetWebhookEvent(tenantId, webhookName, data); foreach (var webhookSubscription in webhookSubscriptions) { var jobArgs = new WebhookSenderArgs { TenantId = webhookSubscription.TenantId, WebhookEventId = webhookInfo.Id, Data = webhookInfo.Data, WebhookName = webhookInfo.WebhookName, WebhookSubscriptionId = webhookSubscription.Id, Headers = webhookSubscription.Headers, Secret = webhookSubscription.Secret, WebhookUri = webhookSubscription.WebhookUri, SendExactSameData = sendExactSameData }; //指定队列执行任务,由触发事件的server执行 IBackgroundJobClient hangFireClient = new BackgroundJobClient(); hangFireClient.Create<WebhookSenderJob>(x => x.ExecuteAsync(jobArgs), new EnqueuedState(AppVersionHelper.MachineName)); } }
- WebhookSenderJob:重写WebhookManager的SignWebhookRequest方法,对于第三方接口,不添加签名的Header:
public override void SignWebhookRequest(HttpRequestMessage request, string serializedBody, string secret) { if (request == null) { throw new ArgumentNullException(nameof(request)); } //第三方接口,不添加签名Header if (IsThirdAPI(request)) { return; } if (string.IsNullOrWhiteSpace(serializedBody)) { throw new ArgumentNullException(nameof(serializedBody)); } var secretBytes = Encoding.UTF8.GetBytes(secret); using (var hasher = new HMACSHA256(secretBytes)) { request.Content = new StringContent(serializedBody, Encoding.UTF8, "application/json"); var data = Encoding.UTF8.GetBytes(serializedBody); var sha256 = hasher.ComputeHash(data); var headerValue = string.Format(CultureInfo.InvariantCulture, SignatureHeaderValueTemplate, BitConverter.ToString(sha256)); request.Headers.Add(SignatureHeaderName, headerValue); } }
- WebhookSenderJob:重写WebhookSender的CreateWebhookRequestMessage方法,对于第三方接口,进行特殊处理:
protected override HttpRequestMessage CreateWebhookRequestMessage(WebhookSenderArgs webhookSenderArgs) { return webhookSenderArgs.WebhookName switch { AppWebHookNames.JST_supplier_upload => JSTHttpRequestMessage(webhookSenderArgs), //聚水潭 - 供应商上传 //... _ => new HttpRequestMessage(HttpMethod.Post, webhookSenderArgs.WebhookUri) }; }
- WebhookSenderJob:重写WebhookSender的AddAdditionalHeaders方法, 对于第三方接口,不添加Headers:
protected override void AddAdditionalHeaders(HttpRequestMessage request, WebhookSenderArgs webhookSenderArgs) { //第三方接口,不添加Header if (IsThirdAPI(request)) { return; } foreach (var header in webhookSenderArgs.Headers) { if (request.Headers.TryAddWithoutValidation(header.Key, header.Value)) { continue; } if (request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value)) { continue; } throw new Exception($"Invalid Header. SubscriptionId:{webhookSenderArgs.WebhookSubscriptionId},Header: {header.Key}:{header.Value}"); } }
- WebhookSenderJob:重写WebhookSender的SendHttpRequest方法,处理第三方接口的回传数据:
protected override async Task<(bool isSucceed, HttpStatusCode statusCode, string content)> SendHttpRequest(HttpRequestMessage request) { using var client = _httpClientFactory.CreateClient(); //避免使用 new HttpClient()方式 client.Timeout = _webhooksConfiguration.TimeoutDuration; var response = await client.SendAsync(request); var isSucceed = response.IsSuccessStatusCode; var statusCode = response.StatusCode; var content = await response.Content.ReadAsStringAsync(); //第三方接口,需要处理回传的数据 if (IsThirdAPI(request)) { string method = TryGetHeader(request.Headers, "ThirdAPI1"); int tenantId = Convert.ToInt32(TryGetHeader(request.Headers, "ThirdAPI2")); switch (method) { case AppWebHookNames.JST_supplier_upload: await JSTSupplierUploadResponse(method, content, tenantId); break; //... default: break; } } return (isSucceed, statusCode, content); }
总结
基于ABP/ZERO的Webhook功能实现,进行一些扩展改造,可以实现业务数据按用户订阅进行推送,包括推送到第三方接口(企业微信群、钉钉等),在很大程度上提升了业务系统的灵活性与实用性。