• IOT云平台 simple(4)springboot netty实现简单的mqtt broker


    本系列教程包括:
    IOT云平台 simple(0)IOT云平台简介
    IOT云平台 simple(1)netty入门
    IOT云平台 simple(2)springboot入门
    IOT云平台 simple(3)springboot netty实现TCP Server
    IOT云平台 simple(4)springboot netty实现简单的mqtt broker
    IOT云平台 simple(5)springboot netty实现modbus TCP Master
    IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

    常见的开源mqttBroker很多,如:
    Mosquitto、emqx;
    这里简单的介绍了mqtt,然后利用springboot netty实现了简单的mqtt Broker。

    mqtt Broker:springboot netty实现;
    mqtt client:MQTT.fx工具软件;

    1 开发

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。
    在这里插入图片描述
    mqtt是运行在TCP/IP之上的应用层,所以mqtt Broker基本就是个TCP Server。
    创建主要的类:
    1) MqttBrokerChannelHandler:
    server channel处理的类;
    2 )MqttBrokerChannelInitializer
    server channel初始化的类
    3)MqttBroker
    server类。
    4)MqttClientStartListener:监听到springboot启动后,启动MqttBroker。

    mqtt数据协议:
    数据结构包括:
    1)固定报头(Fixed header);
    2)可变报头(Variable header);
    3)有效载荷(Payload)。
    在这里插入图片描述

    其中,MqttBrokerChannelHandler类中实现了mqtt数据的解析处理。

    	    @Override
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
            MqttMessage mqttMessage = (MqttMessage) msg;
            log.info("--------------------------begin---------------------------*");
            log.info("来自终端:" + channelHandlerContext.channel().remoteAddress());
            log.info("接收消息:" + mqttMessage.toString());
            try {
                MqttMessageType type = mqttMessage.fixedHeader().messageType();
                MessageStrategy messageStrategy =  messageStrategyManager.getMessageStrategy(type);
                if(messageStrategy!=null){
                    messageStrategy.sendResponseMessage(channelHandlerContext.channel(),mqttMessage);
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
            log.info("--------------------------end---------------------------*");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里我们定义一个策略接口,实现对mqtt 消息的解析返回。

    public interface MessageStrategy {
        void sendResponseMessage(Channel channel, MqttMessage mqttMessage);
    }
    
    • 1
    • 2
    • 3

    然后我们就可以实现不同类型消息的策略处理,如对Connect消息的处理:

    @Slf4j
    public class ConnectAckMessageStrategy implements MessageStrategy{
    
        @Override
        public void sendResponseMessage(Channel channel, MqttMessage mqttMessage) {
            MqttConnectMessage mqttConnectMessage = (MqttConnectMessage)mqttMessage;
    
            /*---------------------------解析接收的消息----------------------------*/
            MqttFixedHeader mqttFixedHeader = mqttConnectMessage.fixedHeader();
            MqttConnectVariableHeader mqttConnectVariableHeader = mqttConnectMessage.variableHeader();
    
            /*---------------------------构建返回的消息---------------------------*/
            //	构建返回报文, 固定报头
            MqttConnAckVariableHeader mqttConnAckVariableHeader =new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.isCleanSession());
            MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeader.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeader.isRetain(), 0x02);
    
            //	构建CONNACK消息体
            MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeader);
            log.info("返回消息:"+connAck.toString());
            channel.writeAndFlush(connAck);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    最后我们定义了MessageStrategyManager类实现对不同类型消息的策略处理:

    @Slf4j
    @Component
    public class MessageStrategyManager {
        public Map<MqttMessageType, MessageStrategy> messageStrategyMap = new HashMap<>();
    
        //根据消息类型获取对应的策略类
        public  MessageStrategy getMessageStrategy(MqttMessageType messageType){
            switch (messageType){
                case CONNECT:
                    return new ConnectAckMessageStrategy();
                case PUBLISH:
                    return new PublishAckMessageStrategy();
                case PUBREL:
                    return new PublishCompleteMessageStrategy();
                case SUBSCRIBE:
                    return new SubscribeAckMessageStrategy();
                case UNSUBSCRIBE:
                    return new UnSubscribeAckMessageStrategy();
                case PINGREQ:
                    return new PingMessageStrategy();
                default:
                    return null;
            }
        }
    
        //根据消息类型获取返回消息的类型
        private  static MqttMessageType getResMqttMessageType(MqttMessageType messageType){
            switch (messageType){
                case CONNECT:
                    return MqttMessageType.CONNACK;
                case PUBLISH:
                    return MqttMessageType.PUBACK;
                case PUBREL:
                    return MqttMessageType.PUBLISH;
                case SUBSCRIBE:
                    return MqttMessageType.SUBACK;
                case UNSUBSCRIBE:
                    return MqttMessageType.UNSUBACK;
                case PINGREQ:
                    return MqttMessageType.PINGRESP;
                default:
                    return null;
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    2 验证

    2.1 MQTT.fx连接broker

    MQTT.fx发送消息的类型:CONNECT
    broker返回消息的类型:CONNACK
    在这里插入图片描述

    2.2 MQTT.fx向broker发送心跳

    MQTT.fx发送消息的类型:PINGREQ
    broker返回消息的类型:PINGRESP
    在这里插入图片描述

    2.3 MQTT.fx向broker发布topic

    首先MQTT.fx发布topic为:

    weather/beijing
    
    • 1

    发布内容为:

    The low temperature is 20C and the high temperature is 25 C
    
    • 1

    在这里插入图片描述
    从log看到broker已经收到topic:
    MQTT.fx发送消息的类型:PUBLISH
    这里qosLevel=AT_MOST_ONCE。
    在这里插入图片描述

    2.4 MQTT.fx断开连接broker

    MQTT.fx发送消息的类型:DISCONNECT
    在这里插入图片描述

    代码详见:
    https://gitee.com/linghufeixia/iot-simple
    code3

  • 相关阅读:
    layui2.4.3版本下拉框实现多选
    记一次ubuntu系统libc.so.6库文件被误删的修复经历
    MySQL表的约束
    PowerBI-窗口函数-INDEX
    2022卡塔尔世界杯赛程直播北京时间_足球世界杯对阵表图完整全部
    1.6.C++项目:仿muduo库实现并发服务器之channel模块的设计
    【C++】class的设计与使用(九)自定义函数对象(function object)
    特性介绍 | MySQL 测试框架 MTR 系列教程(二):进阶篇 - 内存/线程/代码覆盖率/单元/压力测试
    Springboot美得商城的设计与实现860t0计算机毕业设计-课程设计-期末作业-毕设程序代做
    IP归属地应用的几个主要特点
  • 原文地址:https://blog.csdn.net/afei8080/article/details/127899022