.Net Core/.net 6/.Net 8 实现Mqtt客户端

news/2024/5/20 0:07:50 标签: .netcore, .net, windows

.Net Core/.net 6/.Net 8 实现Mqtt客户端

  • 客户端代码
  • 调用

直接上代码
nuget引用
MQTTnet

客户端代码


using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using System.Text;

namespace Code.Mqtt
{
    /// <summary>
    /// Mqtt客户端
    /// </summary>
    public class MqttClientBase
    {
        /// <summary>
        /// 客户端
        /// </summary>
        public IMqttClient client;


        /// <summary>
        /// 订阅主题列表
        /// </summary>
        public List<string> Topics=new List<string>();



        public MqttClientOptions options;


        public MqttClientBaseOptions _opt;


        /// <summary>
        /// 主动断开连接
        /// </summary>
        public bool off = false;
        public bool isconn = false;


        /// <summary>
        /// 创建mqtt客户端,并值接传入初始参数
        /// </summary>
        /// <param name="opt"></param>
        public MqttClientBase(MqttClientBaseOptions opt)
        {
            this._opt = opt;


            //创建客户端
            client = new MqttFactory().CreateMqttClient();

            options =new MqttClientOptions() { 
                ClientId=_opt.clientId,
                ChannelOptions=new MqttClientTcpOptions()
                {
                    Server=_opt.server,
                    Port=_opt.port,
                },

                Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)),

                //清理会话
                CleanSession=false,

                //设置心跳
                KeepAlivePeriod = TimeSpan.FromSeconds(30)
            };
        }

        /// <summary>
        /// 创建mqtt客户端,不传参数,
        /// 必须在调用 Connect之前调用过SetOption方法
        /// </summary>
        public MqttClientBase()
        {
            //创建客户端
            client = new MqttFactory().CreateMqttClient();
        }

        /// <summary>
        /// 设置参数
        /// </summary>
        /// <param name="opt"></param>
        public void SetOption(MqttClientBaseOptions opt)
        {
            options = new MqttClientOptions()
            {
                ClientId = _opt.clientId,
                ChannelOptions = new MqttClientTcpOptions()
                {
                    Server = _opt.server,
                    Port = _opt.port,
                },

                Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)),

                //清理会话
                CleanSession = false,

                //设置心跳
                KeepAlivePeriod = TimeSpan.FromSeconds(30)
            };
        }


        /// <summary>
        /// 连接服务器
        /// </summary>
        /// <param name="action">连接成功后执行</param>
        /// <param name="ConnectedAsync">连接成功事件</param>
        public void Connect(Action<MqttClientConnectedEventArgs> ConnectedAsync=null)
        {
            client.ConnectAsync(options);

            if(ConnectedAsync != null)
            {

                //连接成功事件
                client.ConnectedAsync += (args) =>
                {
                    ConnectedAsync(args);
                    return Task.CompletedTask;
                };

            }

        }

        /// <summary>
        /// 重连服务器
        /// 在连接断开事件中调用,即可实现无限轮询
        /// </summary>
        /// <param name="t">是否重复尝试重连</param>
        /// <param name="i">尝试次数</param>
        public void ReConnect()
        {
            try
            {
                client.ConnectAsync(options).Wait();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }


        public async Task AddTopic(string topic)
        {


            //更新订阅
            client.SubscribeAsync(new MqttClientSubscribeOptions()
            {
                TopicFilters = new List<MqttTopicFilter>() {
                    new MqttTopicFilter { Topic = topic }
                }
            });

            
            //将主题名称加入列表
            Topics.Add(topic);
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task DeleteTopic(string topic)
        {
            client.UnsubscribeAsync(new MqttClientUnsubscribeOptions()
            {
                TopicFilters = new List<string> { topic }
            });
            Topics.Remove(topic);
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="topic">主题</param>
        /// <param name="content">内容</param>
        /// <returns></returns>
        public async Task Publish(string topic, string content)
        {
            if(client.IsConnected)
            {
                client.PublishAsync(new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(content)
                });
            }
        }



        /// <summary>
        /// 主动断开连接
        /// </summary>
        public void Disconnect()
        {
            off = true;
            client.DisconnectAsync();
        }

        /// <summary>
        /// 断开连接事件
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public async Task DisconnectedAsync(Action<MqttClientDisconnectedEventArgs> action)
        {
            client.DisconnectedAsync += (args) => {
                action(args);
                return Task.CompletedTask;
            };
        }



        /// <summary>
        /// 接收消息事件
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public async Task Message(Action<string,string> action) {

            client.ApplicationMessageReceivedAsync += (args) =>
            {
                var topic = args.ApplicationMessage.Topic;
                var msg = args.ApplicationMessage.Payload.BToString();

                action(topic, msg);

                return Task.CompletedTask;
            };
        }



    }
}


调用

我这里是控制台项目



//初始化
var mqtt = new MqttClientBase(new MqttClientBaseOptions() { 
    clientId="client-1",
    username="username",
    password="password",
    server="127.0.0.1",
    port=10883
});

//断开连接事件
mqtt.DisconnectedAsync((e) => {
    Console.WriteLine("连接断开");

    //重连服务器
    mqtt.ReConnect();
});

//连接服务器
mqtt.Connect((args) => {
    /* 连接成功事件 */

    Console.WriteLine("连接成功");


    // 添加主题订阅,建议写到 连接成功事件 里面,这样重连后可以重新订阅主题
    mqtt.AddTopic("topic-1").Wait();
    mqtt.AddTopic("topic-2").Wait();
    mqtt.AddTopic("topic-3").Wait();

    // 取消主题订阅
    mqtt.DeleteTopic("topic-3").Wait();

    // 向指定主题推送消息
    mqtt.Publish("topic-1", "666666666").Wait();

});


// 收到来自服务器的消息 topic:主题  msg:消息内容
mqtt.Message((topic,msg) => { 

    Console.WriteLine($"收到消息:{topic}:{msg}");
});

// 这里暂停三秒,看三秒后主动断开连接效果
// Task.Delay(3000).Wait();

// 主动断开连接
//mqtt.Disconnect();


while (true)
{
    // 向指定主题推送消息
    mqtt.Publish("topic-1", Console.ReadLine());
}



http://www.niftyadmin.cn/n/5416992.html

相关文章

Rust错误处理和Result枚举类异常错误传递

Rust 有一套独特的处理异常情况的机制&#xff0c;它并不像其它语言中的 try 机制那样简单。 首先&#xff0c;程序中一般会出现两种错误&#xff1a;可恢复错误和不可恢复错误。 可恢复错误的典型案例是文件访问错误&#xff0c;如果访问一个文件失败&#xff0c;有可能是因…

线程有几种状态,状态之间的流转是怎样的?

Java中线程的状态分为6种&#xff1a; 1.初始(NEW)&#xff1a;新创建了一个线程对象&#xff0c;但还没有调用start()方法。 2.运行(RUNNABLE)&#xff1a;Java线程中将就绪&#xff08;READY&#xff09;和运行中&#xff08;RUNNING&#xff09;两种状态笼统的称为“运行”…

SpringBoot 多环境的配置(附带有截图)

文章目录 概要整体配置流程配置详细说明技术细节小结 概要 多环境开发 在实际项目开发中&#xff0c;一般需要针对不同的运行环境&#xff0c;如开发环境、测试环境、生产环境等&#xff0c;每个运行环境的数据库等配置都不相同&#xff0c;每次发布测试、更新生产都需要手动…

网络安全运营的工作内容(附资料下载)

【推荐】最新网络安全运营方案和实践合集&#xff08;共80多份&#xff09;.zip 网络安全运营的工作内容是一个多层次、多维度的体系&#xff0c;涵盖了多个关键领域以确保网络环境的稳定和安全。以下是一些主要的工作内容&#xff1a; 安全策略制定与实施&#xff1a; 制定网…

css网格布局简单介绍

前端网格布局是一种用于在网页上创建复杂网格系统的布局技术。它允许开发者通过简单的语法来定义和控制元素的排列方式&#xff0c;使得页面布局更加灵活和可预测。在CSS中&#xff0c;网格布局可以通过display: grid属性来实现。 特点 1. **灵活性**&#xff1a;网格布…

Python算法100例-3.10 不重复的3位数

完整源代码项目地址&#xff0c;关注博主私信源代码后可获取 1.问题描述2.问题分析3.算法设计4.确定程序框架5.完整的程序6.问题拓展 1&#xff0e;问题描述 用1、2、3、4共4个数字能组成多少个互不相同且无重复数字的三位数&#xff1f;都是多少&#xff1f; 2&#xff0e…

【图书推荐】这本书太好了!150页就能让你上手大模型应用开发《大模型应用开发极简入门:基于GPT-4和ChatGPT》

文章目录 蛇尾书特色蛇尾书思维导图作译者简介业内专家书评原文链接 如果问个问题&#xff1a;有哪些产品曾经创造了伟大的奇迹&#xff1f;ChatGPT 应该会当之无愧入选。仅仅发布 5 天&#xff0c;ChatGPT 就吸引了 100 万用户——当然&#xff0c;数据不是关键&#xff0c;关…

Python的数据库编程基础知识

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd;如果停止&#xff0c;就是低谷&#xf…