C#MQTT编程07--MQTT服务器和客户端(wpf版)

1、前言

上篇完成了winform版的mqtt服务器和客户端,实现了订阅和发布,效果666,长这样

 这节要做的wpf版,长这样,效果也是帅BBBB帅,wpf技术是cs程序软件的福音。

 wpf的基础知识和案例项目可以看我的另一个专栏系列文章,这里直接干搞,开发环境依然是vs2022,.netframework 4.8,mqttnet3.x。

WPF真入门教程

2、服务器搭建

1、创建项目方案

 2、添加包组件MQTTNET 

3、创建相关的目录及文件 

 

样式文件CommonStyle.xaml

 

4、设置UI布局界面 

 

                             

5、视图模型,属性绑定和命令绑定

完整代码:

using MQTTnet.Client.Receiving;
using MQTTnet;
using MQTTnet.Server;
using MQTTNETServerWPF.Command;
using MQTTNETServerWPF.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using System.Collections.ObjectModel;
using MQTTnet.Certificates;
using MQTTnet.Protocol;
using System.Runtime.Remoting.Messaging;
namespace MQTTNETServerWPF.ViewModel
{
    public class MainWindowViewModel : ViewModelBase
    {
       
        private IMqttServer mqttserver;//mqtt服务器
        List Topics = new List();
        public MainWindowViewModel()
        {
            //创建服务器对象
            mqttserver = new MqttFactory().CreateMqttServer();
            mqttserver.ApplicationMessageReceivedHandler =
                new MqttApplicationMessageReceivedHandlerDelegate(new Action(Server_ApplicationMessageReceived));//绑定消息接收事件
            mqttserver.ClientConnectedHandler =
                new MqttServerClientConnectedHandlerDelegate(new Action(Server_ClientConnected));//绑定客户端连接事件
            mqttserver.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action(Server_ClientDisconnected));//绑定客户端断开事件
            mqttserver.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action(Server_ClientSubscribedTopic));//绑定客户端订阅主题事件
            mqttserver.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action(Server_ClientUnsubscribedTopic));//绑定客户端退订主题事件
            mqttserver.StartedHandler = new MqttServerStartedHandlerDelegate(new Action(Server_Started));//绑定服务端启动事件
            mqttserver.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action(Server_Stopped));//绑定服务端停止事件
        }
        #region 方法
        /// 绑定消息接收事件
        ///  ///  private void Server_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            string msg = e.ApplicationMessage.ConvertPayloadToString();
            WriteLog(">>> 收到消息:" + msg + ",QoS =" + e.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic);
        }
        ///  /// 绑定客户端连接事件
        ///  ///  private void Server_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            Task.Run(() => {
                App.Current.Dispatcher.Invoke(() => {
                    this.ClientsList.Add(e.ClientId);
                });
                WriteLog(">>> 客户端" + e.ClientId + "连接");
            });
        }
        ///  /// 绑定客户端断开事件
        ///  ///  private void Server_ClientDisconnected(MqttServerClientDisconnectedEventArgs e)
        {
            Task.Run(() => {
                App.Current.Dispatcher.Invoke(() => {
                    this.ClientsList.Remove(e.ClientId);
                });
                WriteLog(">>> 客户端" + e.ClientId + "断开");
            });
        }
        ///  /// 绑定客户端订阅主题事件
        ///  ///  private void Server_ClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            Task.Run(() => {
                App.Current.Dispatcher.Invoke(() => {
                    var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter.Topic);
                    if (topic == null)
                    {
                        topic = new TopicItem { Topic = e.TopicFilter.Topic, Count = 0 };
                        Topics.Add(topic);
                    }
                    if (!topic.Clients.Exists(c => c == e.ClientId))
                    {
                        topic.Clients.Add(e.ClientId);
                        topic.Count++;
                    }
                    this.TopicsList.Clear();
                    foreach (var item in this.Topics)
                    {
                        this.TopicsList.Add($"{item.Topic}:{item.Count}");
                    }
                });
                WriteLog(">>> 客户端" + e.ClientId + "订阅主题" + e.TopicFilter.Topic);
            });
        }
        ///  /// 绑定客户端退订主题事件
        ///  ///  private void Server_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            Task.Run(() => {
                App.Current.Dispatcher.Invoke(() => {
                    var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter);
                    if (topic != null)
                    {
                        topic.Count--;
                        topic.Clients.Remove(e.ClientId);
                    }
                    this.TopicsList.Clear();
                    foreach (var item in this.Topics)
                    {
                        this.TopicsList.Add($"{item.Topic}:{item.Count}");
                    }
                });
                WriteLog(">>> 客户端" + e.ClientId + "退订主题" + e.TopicFilter);
            });
        }
        ///  /// 绑定服务端启动事件
        ///  ///  private void Server_Started(EventArgs e)
        {
            WriteLog(">>> 服务端已启动!");
        }
        ///  /// 绑定服务端停止事件
        ///  ///  private void Server_Stopped(EventArgs e)
        {
            WriteLog(">>> 服务端已停止!");
        }
        ///  /// 显示日志
        ///  ///  public void WriteLog(string message)
        {
            Task.Run(() => {
                App.Current.Dispatcher.Invoke(() => {
                    ConnectWords = message + "\r";
                });
            });
        }
        #endregion
        #region 属性
        private MqttServerModel server = new MqttServerModel("127.0.0.1", "1869", "boss", "1234");//服务器实体
        ///  /// 当前服务器对象
        ///  public MqttServerModel Server
        {
            get { return server; }
            set
            {
                server = value;
                OnPropertyChanged();
            }
        }
        private string connectWords = "";
        ///  /// 连接状态
        ///  public string ConnectWords
        {
            get { return connectWords; }
            set
            {
                connectWords = value;
                OnPropertyChanged();
            }
        }
        private ObservableCollection clientsList = new ObservableCollection();
        ///  /// 客户列表
        ///  public ObservableCollection ClientsList
        {
            get { return clientsList; }
            set
            {
                clientsList = value;
                OnPropertyChanged();
            }
        }
        private ObservableCollection topicsList = new ObservableCollection();
        ///  /// 主题列表
        ///  public ObservableCollection TopicsList
        {
            get { return topicsList; }
            set
            {
                topicsList = value;
                OnPropertyChanged();
            }
        }
        #endregion

        #region 命令
        ///  /// 启动命令
        ///  [Obsolete]
        public ICommand StartCommand
        {
            get
            {
                return new RelayCommand(async o => {
                    var optionBuilder = new MqttServerOptionsBuilder()
                .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(Server.ServerIP))
                .WithDefaultEndpointPort(int.Parse(Server.ServerPort))
                .WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000))
                .WithConnectionValidator(t => {
                    string un = "", pwd = "";
                    un = Server.ServerName;
                    pwd = Server.ServerPwd;
                    if (t.Username != un || t.Password != pwd)
                    {
                        t.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                    }
                    else
                    {
                        t.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
                    }
                });
                    var option = optionBuilder.Build();
                    //启动
                    await mqttserver.StartAsync(option);
                }
                );
            }
        }
        ///  /// 启动命令
        ///  [Obsolete]
        public ICommand StopCommand
        {
            get
            {
                return new RelayCommand(async o => { 
                    if (server != null)
                    {
                        await mqttserver.StopAsync();
                    } 
                });
            }
        }
        #endregion
    }
}

 注意一个地方,就是给文本框添加了一个右键“清空”的功能,看看是怎么样实现的?

 6、启动测试服务器

启动成功,服务器kokokokoko!!!!

 3、客户端创建

1、添加项目MQTTNETClientWPF

2、添加客户端的组件

 

3、创建相关的类文件及目录 

 

4、设计UI布局

 布局仔细 看下

                              

5、视图模型viewmodel

模型类的属性绑定和命令绑定,数据驱动控件,即Mvvm渲染方法

using MQTTnet.Client.Options;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTNETClientWPF.Command;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using MQTTNETClientWPF.Model;
using MQTTnet;
namespace MQTTNETClientWPF.ViewModel
{
    public class MainWindowViewModel : ViewModelBase
    {
        private IManagedMqttClient mqttClient; //mqtt客户端
        public MainWindowViewModel()
        {
            var factory = new MqttFactory();
            mqttClient = factory.CreateManagedMqttClient();//创建客户端对象
            //绑定断开事件
            mqttClient.UseDisconnectedHandler(async ee => {
                WriteLog(DateTime.Now.ToString() + "与服务器之间的连接断开了,正在尝试重新连接");
                // 等待 5s 时间
                await Task.Delay(TimeSpan.FromSeconds(5));
                try
                {
                    mqttClient.UseConnectedHandler(cc => {
                        WriteLog(">>> 连接到服务成功!");
                    });
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"重新连接服务器失败:{ex}");
                }
            });
            //绑定接收事件
            mqttClient.UseApplicationMessageReceivedHandler(aa => {
                 try
                {
                    string msg = aa.ApplicationMessage.ConvertPayloadToString();
                    WriteLog(">>> 消息:" + msg + ",QoS =" + aa.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + aa.ClientId + ",主题:" + aa.ApplicationMessage.Topic);
                }
                catch (Exception ex)
                {
                    WriteLog($"+ 消息 = " + ex.Message);
                } 
            });
            //绑定连接事件
            mqttClient.UseConnectedHandler(ee => {
                WriteLog(">>> 连接到服务");
            });
        }
        ///  /// 显示日志
        ///  ///  public void WriteLog(string message)
        {
            Task.Run(() => {
                App.Current.Dispatcher.Invoke(() => {
                    ConnectWords = message + "\r";
                });
            });
        }
        #region 属性
        private MqttClientModel client = new MqttClientModel("127.0.0.1", "1869", "boss", "1234", "c1");//服务器实体
        ///  /// 连接对象
        ///  public MqttClientModel Client
        {
            get { return client; }
            set
            {
                client = value;
                OnPropertyChanged();
            }
        }
        private string connectWords = "";
        ///  /// 连接状态
        ///  public string ConnectWords
        {
            get { return connectWords; }
            set
            {
                connectWords = value;
                OnPropertyChanged();
            }
        }
        private string topic = "shanghai";
        ///  /// 主题
        ///  public string Topic
        {
            get { return topic; }
            set
            {
                topic = value;
                OnPropertyChanged();
            }
        }
        private string pubmsg = "0103";
        ///  /// 发布
        ///  public string Pubmsg
        {
            get { return pubmsg; }
            set
            {
                pubmsg = value;
                OnPropertyChanged();
            }
        }
        #endregion
        #region 命令
        ///  /// 连接命令
        ///  public ICommand OpenCommand
        {
            get
            {
                return new RelayCommand(async o => {
                    var mqttClientOptions = new MqttClientOptionsBuilder()
                             .WithClientId(this.Client.ClientId)
                             .WithTcpServer(this.Client.ServerIP, int.Parse(this.Client.ServerPort))
                             .WithCredentials(this.Client.ServerName, this.Client.ServerPwd);
                    var options = new ManagedMqttClientOptionsBuilder()
                                .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
                                .WithClientOptions(mqttClientOptions.Build())
                                .Build();
                    //开启
                    var t = mqttClientOptions;
                    await mqttClient.StartAsync(options);
                });
            }
        }
        ///  /// 断开命令
        ///  public ICommand CloseCommand
        {
            get
            {
                return new RelayCommand(async o => {
                    if (mqttClient != null)
                    {
                        if (mqttClient.IsStarted)
                        {
                            await mqttClient.StopAsync();
                        }
                        mqttClient.Dispose();
                    }
                });
            }
        }
        ///  /// 订阅命令
        ///  [Obsolete]
        public ICommand SubscriteCommand
        {
            get
            {
                return new RelayCommand(async o => {
                    if (string.IsNullOrWhiteSpace(this.Topic))
                    {
                        WriteLog(">>> 请输入主题");
                        return;
                    }
                    //在 MQTT 中有三种 QoS 级别: 
                    //At most once(0) 最多一次
                    //At least once(1) 至少一次
                    //Exactly once(2) 恰好一次
                    //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.tbTopic.Text).WithAtMostOnceQoS().Build());//最多一次, QoS 级别0
                    await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.Topic).WithAtLeastOnceQoS().Build());//恰好一次, QoS 级别1 
                    WriteLog($">>> 成功订阅 {this.Topic}");
                });
            }
        }
        ///  /// 发布命令
        ///  public ICommand PublishCommand
        {
            get
            {
                return new RelayCommand(async o => {
                    if (string.IsNullOrWhiteSpace(this.Topic))
                    {
                        WriteLog(">>> 请输入主题");
                        return;
                    }
                    var result = await mqttClient.PublishAsync(
                        this.Topic,
                        this.Pubmsg,
                        MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 级别1 
                    WriteLog($">>> 主题:{this.Topic},消息:{this.Pubmsg},结果: {result.ReasonCode}");
                });
            }
        }
        #endregion
    }
}

6、启动客户端

比较屌

4、测试mqtt

 

1、启动服务器,客户端连接成功

 2、测试订阅

3、测试发布

 再启动一个客户端程序,有人不知道如何启动,看下面

c1发布一个消息,看看c1,c2有没有收到,很明显都收到了

 服务器显示有关信息,完全good

基于mqttnet实现的wpf版通信,完美实现,效果飞起来了,颜值高,效果好,帅B得上了飞机。

5、完整代码打包下载 

链接:https://pan.baidu.com/s/1sfQnGEEcsRTBKUSDOdCeTA 

提取码:z2hj 

讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。