手写MSMQ微软消息队列收发工具类

  • 手写MSMQ微软消息队列收发工具类已关闭评论
  • 16 次浏览
  • A+
所属分类:.NET技术
摘要

一、MSMQ介绍MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。

一、MSMQ介绍

MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。

两个重要的概念:队列和消息。队列是存放消息的容器和传输消息的通道。消息是在队列上存储和传输的数据的基本单元;这个消息在计算机上的存在形式可以是任意格式的文件;在C#程序中的消息是各种类型的Class类的实例,在程序中的消息类型可以在创建XmlMessageFormatter实例时指定。消息最大4M。

应用场景可以是异步处理,如系统短时间收到大量数据/请求,到达程序能够处理的请求数上限,可以将待处理的数据/请求全部放入队列中,程序再从队列中读取消息逐个处理。应用场景也可以是系统解耦,最常见的举例如电商平台中,订单系统要将订单数据发给支付系统时,订单系统可以将数据存入队列中,支付系统从队列中读取后处理。这时订单系统不需要调用支付系统的接口,这样订单系统和支付系统可以独立部署减少了依赖。

二、工具类封装

封装的好处:甲方早期项目中大量用到微软的技术,消息队列都是用的MSMQ。我们的多个项目都有对接MQ的需求,将消息收发的功能从多个项目中提取出来放一个独立程序中,后期维护中只需要修改一份代码。同时MQ这部分功能也从业务系统中剥离出来,更新MQ程序时不影响业务系统。封装中需要兼容的两点:

  • 功能较全,适用性强。满足多种格式的数据收发,同时兼容事务性队列和非事务性队列。

  • 配置简单。

  • 同时支持多个队列的收发处理。

     

几个核心类

  • MQHandler:抽象类,实现消息发送和备份、消息接收的主流程,抽象方法Send和Receive,这两个方法在下一层的实现类中实现具体功能。为什么要有这个抽象类?这里主要考虑项目中将来可能出现别的类型队列,但对【发送-备份-接收】的主流程来说不管什么类型的队列都不变,那么这部分功能对不同队列来说是相同的代码,因此在MQHandler实现,因队列类型不同的部分代码分别在下一层中实现。

手写MSMQ微软消息队列收发工具类手写MSMQ微软消息队列收发工具类

   public abstract class MQHandler     {         protected MqCfgEntity cfg = null;         //是否正在处理消息         private bool IsProcessingMessage = false;          public MQHandler(MqCfgEntity cfg)         {             this.cfg = cfg;         }          public void Start()         {             while (true)             {                 try                 {                     if (IsProcessingMessage) return;                     IsProcessingMessage = true;//正在处理消息                     if ("Send".Equals(cfg.ProcessType))                     {                         Send();                     }                     else if ("Receive".Equals(cfg.ProcessType))                     {                         Receive();                     }                     IsProcessingMessage = false; //消息处理完成                 }                 catch (Exception ex)                 {                     Log4Net.Error($"{ex.Message},{ex.StackTrace}");                     IsProcessingMessage = false; //消息处理完成                 }                 Thread.Sleep(cfg.SplitSeconds * 1000);             }         }          /// <summary>         /// 备份已发送的文件,file为待备份完整文件名         /// </summary>         protected void BackSendFile(string file)         {             if (File.Exists(file) && cfg.BackPath.IsNotNullOrEmpty())             {                 FileInfo info = new FileInfo(file);                 string backPath = cfg.BackPath;                 if (cfg.BackFormat.IsNotNullOrEmpty())                 {                     backPath = $"{backPath}/{DateTime.Now.ToString(cfg.BackFormat)}";                     if (!Directory.Exists(backPath))                     {                         Directory.CreateDirectory(backPath);                     }                 }                 string backName = $"{backPath}/{info.Name}";                 if (File.Exists(backName))                 {                     backName = $"{backPath}/{Guid.NewGuid().ToString()}_{info.Name}";                 }                 File.Move(file, backName);             }         }          private void Send()         {             var names = Directory.GetFiles(cfg.FilePath, cfg.FileFilter);             if (names != null && names.Count() > 0)             {                 var files = names.ToList().Select(f => f.ForMatPath());                 foreach (var file in files)                 {                     if (file.TryOpenFile() && Send(file))                     {                         Log4Net.Info($"{file}已发送");                         BackSendFile(file);                         Log4Net.Info($"{file}已备份");                     }                 }             }         }          public abstract bool Send(string file);          public abstract void Receive();     }

View Code

  • MSMQHandler,继承自MQHandler,实现Send和Receive方法。这里面的XmlMessageFormatter参数用来指定消息类型。如文件byte[],文本string,xml对象XmlDocument。

手写MSMQ微软消息队列收发工具类手写MSMQ微软消息队列收发工具类

   public class MSMQHandler : MQHandler     {         private XmlMessageFormatter formatter = null;         //把xml当作txt读取在msmq中传输时,使用utf8编码,Unicode可能会造成部分报文数据紊乱         private Encoding encoding = Encoding.UTF8;          public MSMQHandler(MqCfgEntity cfg) : base(cfg)         {             Type type = GetMsgType(cfg.MessageType);             formatter = new XmlMessageFormatter(new Type[] { type });         }          public override void Receive()         {             MessageQueue queue = null;             try             {                 queue = new MessageQueue(cfg.Queue);                 int num = queue.GetAllMessages().Length;                 for (int i = 0; i < num; i++)                 {                     ReceiveMessage(queue);                 }             }             catch (Exception ex)             {                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");             }             finally             {                 if (queue != null) queue.Dispose();             }         }          private void ReceiveMessage(MessageQueue queue)         {             System.Messaging.Message message = null;             try             {                 message = queue.Receive();                 message.Formatter = formatter;                 string toFile = $"{cfg.FilePath}/{message.Label}";                 if ("file".Equals(cfg.MessageType))                 {                     SaveMessageAsBinaryFile(message, toFile);                 }                 else if ("xml".Equals(cfg.MessageType))                 {                     var doc = (XmlDocument)message.Body;                     doc.Save(toFile);                 }                 else if ("txt".Equals(cfg.MessageType))                 {                     var txt = (string)message.Body;                     SaveMessageAsTxt(message, toFile);                 }                 Log4Net.Info($"收到消息,已保存,{toFile}");             }             catch (Exception ex)             {                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");             }             finally             {                 if (message != null) message.Dispose();             }         }          private void SaveMessageAsTxt(Message message, string toFile)         {             FileStream fs = null;             try             {                 fs = new FileStream(toFile, FileMode.Create);                 string content = (string)message.Body;                 var bts = encoding.GetBytes(content);                 fs.Write(bts, 0, bts.Length);             }             catch (Exception ex)             {                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");             }             finally             {                 if (fs != null) fs.Dispose();             }         }          private void SaveMessageAsBinaryFile(Message message, string toFile)         {             FileStream fs = null;             try             {                 fs = new FileStream(toFile, FileMode.Create);                 var bts = (byte[])message.Body;                 fs.Write(bts, 0, bts.Length);             }             catch (Exception ex)             {                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");             }             finally             {                 if (fs != null) fs.Dispose();             }         }          public override bool Send(string file)         {             bool success = true;             FileInfo fileInfo = new FileInfo(file);             MessageQueue myQueue = null;             try             {                 myQueue = new MessageQueue(cfg.Queue);                 object body = null;                 if ("file".Equals(cfg.MessageType))                 {                     FileStream fs = null;                     try                     {                         fs = new FileStream(file, FileMode.Open);                         byte[] bts = new byte[fs.Length];                         fs.Read(bts, 0, bts.Length);                         body = bts;                     }                     catch (Exception ex)                     {                         Log4Net.Error($"{ex.Message},{ex.StackTrace}");                     }                     finally                     {                         if (fs != null) fs.Dispose();                     }                 }                 else if ("xml".Equals(cfg.MessageType))                 {                     XmlDocument doc = new XmlDocument();                     doc.Load(file);                     body = doc;                 }                 else if ("txt".Equals(cfg.MessageType))                 {                     FileStream fs = null;                     try                     {                         fs = new FileStream(file, FileMode.Open);                         byte[] bts = new byte[fs.Length];                         fs.Read(bts, 0, bts.Length);                         string content = encoding.GetString(bts);                         body = content;                     }                     catch (Exception ex)                     {                         Log4Net.Error($"{ex.Message},{ex.StackTrace}");                     }                     finally                     {                         if (fs != null) fs.Dispose();                     }                 }                 Push(fileInfo.Name, myQueue, body);             }             catch (Exception ex)             {                 success = false;                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");             }             finally             {                 if (myQueue != null) myQueue.Dispose();             }             return success;         }          //往队列上推送消息         private void Push(string fileName, MessageQueue myQueue, object body)         {             System.Messaging.Message message = null;             try             {                 message = new System.Messaging.Message(body);                 message.Formatter = formatter;                 message.Label = fileName;                 if (cfg.IsTransQueue)                 {                     using (MessageQueueTransaction trans = new MessageQueueTransaction())                     {                         trans.Begin();                         myQueue.Send(message, trans);                         trans.Commit();                     }                 }                 else                 {                     myQueue.Send(message);                 }             }             catch (Exception ex)             {                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");             }             finally             {                 if (message != null) message.Dispose();             }         }          /// <summary>         /// 根据配置文件的类型,返回MQ队列上的消息类型         /// </summary>         private Type GetMsgType(string code)         {             Type type = null;             switch (code)             {                 case "file": type = typeof(byte[]); break;                 case "txt": type = typeof(string); break;                 case "xml": type = typeof(XmlDocument); break;             }             return type;         }     }

View Code

 

配置文件说明

  • mq.xml,在exe同级目录中,根节点为Config,其中可以包含多个Msmq节点,一个Msmq节点对应一个接收或发送任务。

  • Msmq节点字段说明:

    • ProcessType:Send或Receive,表示用于发送或接收消息。

    • Queue:队列名称。

    • FilePath:待发送的文件所在目录,或接收到的文件的存放目录。

    • FileFilter:Send时才配置,表示待发送目录中哪些后缀格式的文件需要处理,如*.txt,*.xml,*.jpg,*.*。

    • SplitSeconds:每一轮任务处理完成后暂停多少秒再进入下一个轮循。

    • BackPath:Send时才配置,消息发送以后文件备份到哪个目录。

    • BackFormat:跟BackPath配合使用,BackPath是备份目录,BackPath表示备份文件在BackPath下按小时/天/月/年来分文件夹备份。可以为yyyyMM、yyyyMMdd等。

    • MessageType:消息类型,可以为file、xml、txt,表示消息以哪种类型(对应XmlMessageFormatter中的文件byte[]、文本string、xml对象XmlDocument)发送。

    • IsTransQueue:true或false,表示队列是否为事务性队列。

其它说明

  • 程序运行环境:.net framework 4.5+

  • 程序启动:直接运行MsmqClient.exe,后台进程,无前台界面。

  • 完整项目代码:关注以下公众号,后台回复"msmq"获取

手写MSMQ微软消息队列收发工具类