- A+
所属分类:.NET技术
十年河东,十年河西,莫欺少年穷
学无止境,精益求精
netcore3.1控制台应用程序,引入MQTTnet 2.8版本
订阅端:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using MQTTnet; using MQTTnet.Server; using MQTTnet.Client; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using MQTTnet.Protocol; namespace swapConsole { class Program { private static MqttClient mqttClient = null; private static string topic = "test123ABC"; private static IMqttClientOptions Options { get { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); builder.WithCleanSession(false); //用户名 密码 builder.WithCredentials("", ""); var id = Guid.NewGuid().ToString(); builder.WithClientId(id); builder.WithTcpServer("1270.0.0.0", 1883); return builder.Build(); } } static async Task Main(string[] args) { MqttFactory factory = new MqttFactory(); if (mqttClient == null) { mqttClient = (MqttClient)factory.CreateMqttClient(); mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += async (s, e) => { Console.WriteLine("尝试重连!" + Environment.NewLine); await ConnectToServer(); }; } await ConnectToServer(); Console.ReadLine(); } /// <summary> /// 连接MQTT服务器 /// </summary> private static async Task ConnectToServer() { try { var res =await mqttClient.ConnectAsync(Options); } catch (Exception ex) { Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine); } } /// <summary> /// 连接MQTT服务器触发 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine); SubscribeInfo(); } /// <summary> /// 接收消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); } /// <summary> /// 订阅消息 /// </summary> public static void SubscribeInfo() { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("订阅主题不能为空!"); return; } if (!mqttClient.IsConnected) { Console.WriteLine("MQTT客户端尚未连接!"); return; } mqttClient.SubscribeAsync(new List<TopicFilter> { new TopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce) }); Console.WriteLine($"已订阅[{topic}]主题" + Environment.NewLine); } /// <summary> /// 退订消息 /// </summary> public static void UnSubscribeInfo() { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("退订主题不能为空!"); return; } if (!mqttClient.IsConnected) { Console.WriteLine("MQTT客户端尚未连接!"); return; } mqttClient.UnsubscribeAsync(topic); Console.WriteLine($"已退订[{topic}]主题" + Environment.NewLine); } } }
View Code
发布端:
using MQTTnet; using MQTTnet.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace swapPublish { class Program { private static MqttClient mqttClient = null; private static string topic = "test123ABC"; private static IMqttClientOptions Options { get { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); builder.WithCleanSession(false); //用户名 密码 builder.WithCredentials("", ""); var id = Guid.NewGuid().ToString(); builder.WithClientId(id); builder.WithTcpServer("127.0.0.1", 1883); return builder.Build(); } } static async Task Main(string[] args) { MqttFactory factory = new MqttFactory(); if (mqttClient == null) { mqttClient = (MqttClient)factory.CreateMqttClient(); mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += async(s, e) => { Console.WriteLine("尝试重连!" + Environment.NewLine); await ConnectToServer(); }; } await ConnectToServer(); Console.WriteLine("已断开MQTT连接!" + Environment.NewLine); Console.ReadLine(); } /// <summary> /// 连接MQTT服务器 /// </summary> private static async Task ConnectToServer() { try { var res = await mqttClient.ConnectAsync(Options); } catch (Exception ex) { Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine); } } /// <summary> /// 连接MQTT服务器触发 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine); for(int i = 0; i < 10; i++) { var tak = PublishInfo(); Thread.Sleep(2000); } } private static async Task PublishInfo( ) { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("发布主题不能为空!"); return; } string inputString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); MqttApplicationMessageBuilder builder = new MqttApplicationMessageBuilder(); builder.WithPayload(Encoding.UTF8.GetBytes(inputString)); builder.WithTopic(topic); builder.WithRetainFlag(false); builder.WithExactlyOnceQoS(); await mqttClient.PublishAsync(builder.Build()); } } }
View Code
如何只允许一个客户端消费同一个消息,暂时未解决!
大家有解决方法,请贴出评论。谢谢
MQTTnet 3.0.16 版本的使用
客户端:
using MQTTnet; using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace mqttsub { class Program { static async Task Main(string[] args) { MqttClient mqtt = new MqttClient(); await mqtt.StartAsync(); Console.ReadKey(); } } public class MqttClient { private IMqttClient client; private IMqttClientOptions options; MqttClientDto model =null; public MqttClient() { model = new MqttClientDto { Account = "", PassWord = "", ClientId = Guid.NewGuid().ToString(), IP = "", Port = 1883, Topic="test/+/ABC" //通配符模式 该模式匹配 test/123/ABC testABC test/DDDDD/ABC 等 }; } public async Task StartAsync() { try { client = new MqttFactory().CreateMqttClient(); var build = new MqttClientOptionsBuilder() //配置客户端Id .WithClientId(Guid.NewGuid().ToString()) //配置登录账号 .WithCredentials(model.Account,model.PassWord) //配置服务器IP端口 这里得端口号是可空的 .WithTcpServer(model.IP, 1883) .WithCleanSession(); options = build.Build(); //收到服务器发来消息 client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler); //client.UseApplicationMessageReceivedHandler(args=> { // Console.WriteLine("==================================================="); // Console.WriteLine("收到消息:"); // Console.WriteLine($"主题:{args.ApplicationMessage.Topic}"); // Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); // Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); // Console.WriteLine(); //}); //连接成功 client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(ConnectedHandler); //client.UseConnectedHandler(args=> { // Console.WriteLine("本客户端已连接成功"); // Console.WriteLine($"地址:{model.IP}"); // Console.WriteLine($"端口:{model.Port}"); // Console.WriteLine($"客户端:{model.ClientId}"); // Console.WriteLine($"账号:{model.Account}"); // Console.WriteLine(); // //第1种订阅方式 // client.SubscribeAsync("主题名称").GetAwaiter().GetResult(); // //第2种订阅方式 // List<MqttTopicFilter> Topics = new List<MqttTopicFilter>(); // Topics.Add(new MqttTopicFilter() { Topic = "主题名称A", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }); // Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" }); // Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" }); // client.SubscribeAsync(Topics.ToArray()).GetAwaiter().GetResult(); // //第3种订阅方式 // MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder(); // builder.WithTopicFilter("AAA"); // client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult(); //}); //断开连接 重连就写在此处 client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(DisconnectedHandler); //client.UseDisconnectedHandler(args => //{ // Console.WriteLine("本客户端已经断开连接"); // Console.WriteLine(); // try // { // client.ConnectAsync(options).GetAwaiter().GetResult(); // } // catch (Exception ex) // { // Console.WriteLine("重连失败"); // } //}); //客户端发送消息 //await client.PublishAsync("你想要的主题", "你需要发送的东西"); //await client.PublishAsync("你想要的主题", Encoding.UTF8.GetBytes("你需要发送的东西").ToList()); //连接 await client.ConnectAsync(options); } catch (MqttConnectingFailedException) { Console.WriteLine("身份校验失败"); } catch (Exception ex) { Console.WriteLine("出现异常"); Console.WriteLine(ex.Message); } } /// <summary> /// 客户端断开连接后,如果需要重连在此处实现 /// </summary> /// <param name="obj"></param> private async void DisconnectedHandler(MqttClientDisconnectedEventArgs obj) { Console.WriteLine("本客户端已经断开连接"); Console.WriteLine(); try { await client.ConnectAsync(options); } catch (Exception) { Console.WriteLine("重连失败"); } } /// <summary> /// 连接成功 在此处做订阅主题(Topic)操作 /// </summary> /// <param name="obj"></param> private async void ConnectedHandler(MqttClientConnectedEventArgs obj) { Console.WriteLine("本客户端已连接成功"); Console.WriteLine($"地址:{model.IP}"); Console.WriteLine($"端口:{model.Port}"); Console.WriteLine($"客户端:{model.ClientId}"); Console.WriteLine($"账号:{model.Account}"); Console.WriteLine(); //第1种订阅方式 // client.SubscribeAsync("主题名称").GetAwaiter().GetResult(); //第2种订阅方式 List<MqttTopicFilter> Topics = new List<MqttTopicFilter>(); Topics.Add(new MqttTopicFilter() { Topic = model.Topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce}); //Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" }); //Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" }); await client.SubscribeAsync(Topics.ToArray()); //第3种订阅方式 //MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder(); //builder.WithTopicFilter("AAA"); //client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult(); } /// <summary> /// 收到消息 /// </summary> /// <param name="obj"></param> private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj) { Console.WriteLine("==================================================="); Console.WriteLine("收到消息:"); Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}"); Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}"); Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); Console.WriteLine(); } } public class MqttClientDto { /// <summary> /// 连接地址 /// </summary> public string IP { get; set; } /// <summary> /// 账号 /// </summary> public string Account { get; set; } /// <summary> /// 密码 /// </summary> public string PassWord { get; set; } /// <summary> /// 客户端Id /// </summary> public string ClientId { get; set; } public int Port { get; set; } public string Topic { get; set; } } }
View Code
服务端:
using MQTTnet; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Net; using System.Text; using System.Threading.Tasks; namespace MqttPub { class Program { static async Task Main(string[] args) { await new ServerDome(). StartAsync(); Console.Read(); } } public class ServerDome { private IMqttServer server; MqttClientDto model = null; public ServerDome() { model = new MqttClientDto { Account = "", PassWord = "", ClientId = Guid.NewGuid().ToString(), IP = "", Port = 1883, Topic = "test" }; } public async Task StartAsync() { if (server == null || !server.IsStarted) { server = new MqttFactory().CreateMqttServer(); MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder(); //、默认监听端口 serverOptions.WithDefaultEndpointPort(model.Port); //校验客户端信息 serverOptions.WithConnectionValidator(client => { string Account = client.Username; string PassWord = client.Password; string clientid = client.ClientId; if (Account == "" && PassWord == "") { client.ReasonCode = MqttConnectReasonCode.Success; Console.WriteLine("校验成功"); } else { client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; Console.WriteLine("校验失败"); } }); //客户端发送消息监听 server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler); //server.UseApplicationMessageReceivedHandler(args=>{ // Console.WriteLine("==================================================="); // Console.WriteLine("收到消息:"); // Console.WriteLine($"客户端:{args.ClientId}"); // Console.WriteLine($"主题:{args.ApplicationMessage.Topic}"); // Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); // Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); // Console.WriteLine(); //}); //客户端连接事件 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(ClientConnectedHandler); //server.UseClientConnectedHandler(args => //{ // Console.WriteLine($"{args.ClientId}此客户端已经连接到服务器"); //}); //客户端断开连接事件 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(ClientDisconnectedHandler); //server.UseClientDisconnectedHandler(args => { // Console.WriteLine($"断开连接的客户端:{args.ClientId}"); // Console.WriteLine($"断开连接类型:{args.DisconnectType.ToString()}"); //}); //客户端订阅主题事件 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribedTopicHandler); //客户端取消订阅主题事件 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribedTopicHandler); //服务器启动事件 server.StartedHandler = new MqttServerStartedHandlerDelegate(StartedHandler); //服务器停止事件 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(StoppedHandler); //服务端发送数据 //await server.PublishAsync("你想要的主题","你需要发送的东西"); //var mqttApplicationMessage = new MqttApplicationMessage(); //mqttApplicationMessage.Topic = "你想要的主题"; //mqttApplicationMessage.Payload = Encoding.ASCII.GetBytes("你需要发送的东西"); //await server.PublishAsync(mqttApplicationMessage); //启动服务器 await server.StartAsync(serverOptions.Build()); } } public async Task StopAsync() { if (server != null) { if (server.IsStarted) { await server.StopAsync(); server.Dispose(); } } } /// <summary> /// 客户端取消订阅主题 /// </summary> /// <param name="obj"></param> private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj) { Console.WriteLine($"客户端:{obj.ClientId}"); Console.WriteLine($"取消订阅主题:{obj.TopicFilter}"); } /// <summary> /// 客户端订阅的主题 /// </summary> /// <param name="obj"></param> private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj) { Console.WriteLine($"客户端:{obj.ClientId}"); Console.WriteLine($"订阅主题:{obj.TopicFilter.Topic}"); } /// <summary> /// 客户端断开连接 /// </summary> /// <param name="obj"></param> private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj) { Console.WriteLine($"断开连接的客户端:{obj.ClientId}"); Console.WriteLine($"断开连接类型:{obj.DisconnectType.ToString()}"); } /// <summary> /// 客户端连接到服务器事件 /// </summary> /// <param name="obj"></param> private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj) { throw new NotImplementedException(); } /// <summary> /// 收到各个客户端发送的消息 /// </summary> /// <param name="obj"></param> private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj) { Console.WriteLine("==================================================="); Console.WriteLine("收到消息:"); Console.WriteLine($"客户端:{obj.ClientId}"); Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}"); Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}"); Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); Console.WriteLine(); } /// <summary> /// MQTT启动服务器事件 /// </summary> /// <param name="obj"></param> private void StartedHandler(EventArgs obj) { Console.WriteLine($"程序已经启动!监听端口为:{model.Port}"); } /// <summary> /// MQTT服务器停止事件 /// </summary> /// <param name="obj"></param> private void StoppedHandler(EventArgs obj) { Console.WriteLine("程序已经关闭"); } } public class MqttClientDto { /// <summary> /// 连接地址 /// </summary> public string IP { get; set; } /// <summary> /// 账号 /// </summary> public string Account { get; set; } /// <summary> /// 密码 /// </summary> public string PassWord { get; set; } /// <summary> /// 客户端Id /// </summary> public string ClientId { get; set; } public int Port { get; set; } public string Topic { get; set; } } }
View Code
这里说明下如何使用通配符
例如,发送 topic 主题为:test/123/ABC 或者 test/234/ABC ,消费者在订阅时,可以使用:test/+/ABC 来订阅该类消息。
通配符的作用为分组订阅、
发布者发布内容为: test//status ,订阅者订阅的为:test/+/status
当然,发布者也可以在 / / 之间增加内容,例如设备号:
主题名不能使用通配符, 但是主题过滤器中可以使用通配符
.因此,订阅者可以通过过滤器接合通配符订阅一类消息
以MQTTnet 3.0.16 为例,开启自动确认,开启不保留最后一跳消息。