• 物联网世界的无线电报之MQTT详解


    1. 前言

    背景:最近我们的一个客户的项目使用的IoT设备,需要对接,对接方式是使用MQTT协议。所以基于此,我翻阅了一些资料,对MQTT有了一个基本的了解,以及写了一些demo进行了验证。给大家分享一下

    本次文章的标题有点标题党的嫌疑哈哈。为什么把MQTT被比喻为物联网世界的“无线电报”,主要是因为其设计原理和应用场景。就像无线电报因为简单和有效的传输机制而被广泛使用,MQTT以其简单的发布订阅模式、轻量级的数据负载和高效的数据传输能力得到了广泛应用。同时,无线电报考虑的是在有限的无线电频段上发送信息,MQTT也就被设计为应对低带宽、高延迟或不稳定的网络环境,这些都是物联网场景中常见的问题。另外,正如无线电报只在需要发送信息时才发送信号以节约能源,MQTT也非常注重节省能源,特别是对于电池供电的设备来说。通过保持开放的TCP/IP连接,以及使用“最后遗嘱”和“遗嘱”消息,MQTT可以在尽可能减少电池消耗的同时,确保信息的传输。所以,MQTT被比喻为无线电报,是因为它们在设计理念、应用场景和技术特性上有着许多相似之处。

    MQTT在物联网中的应用,就像是物联网设备与互联网之间的桥梁,连接着每一个智能设备,让我们的生活因此而变得更智能,更便捷。

    1.1. 物联网与MQTT的关系

    物联网(IoT)是指在互联网的基础上,通过信息传感设备,将任何普通的物品与互联网进行连接,以实现普通物品的信息交换和通信,从而实现智能化的概念。而MQTT(Message Queuing Telemetry Transport)则是一种专门针对物联网的通信协议,用于设备间的消息传输。

    物联网和MQTT之间的关系可以通过以下几个方面来理解:
    在这里插入图片描述

    1. 设备连接:物联网设备的数量庞大,分布广泛,而MQTT作为一种轻量级的发布/订阅消息传输协议,特别适合于网络带宽有限、设备能力弱、运行环境恶劣的物联网设备的连接。

    2. 数据传输:物联网设备通常需要实时或定时上报数据,并接收命令进行操作。MQTT的发布/订阅模型非常适合于此类场景,设备和服务器只需要关注自己感兴趣的主题,就可以实现有效的数据交换。

    3. 节省资源:物联网设备通常配置低,能源有限。MQTT的设计非常轻量,消息头部非常小,可以大大减少网络传输的数据量,降低设备的能耗,使得它非常适合于物联网设备。

    4. 服务质量:MQTT支持三种服务质量等级,可以根据物联网设备的实际需求选择最适合的服务质量等级,以保证消息的可靠传输。

    1.2. MQTT的重要性及应用场景

    MQTT 是一种基于订阅/发布模式的轻量级通信协议,为物联网提供了稳定、高效的数据传输能力。其重要性如下:
    MQTT 传输的消息分为:主题(Topic)和负载(Payload)两部分
    Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(Payload),如下图中的 temperature就是主题。Payload,可以理解为消息的内容,是指订阅者具体要使用的内容,如下图中的 24℃ 就是消息的内容
    在这里插入图片描述

    1. 轻量级MQTT协议的设计非常轻量级,尤其适合在网络条件不佳的环境中使用,这使得它成为物联网环境中的理想选择。

    2. 省电:由于其轻量级的设计,MQTT消耗的电力比其他协议更少,这对于需要长期运行的物联网设备来说至关重要。

    3. 数据传输优化:MQTT采用了订阅/发布模式,允许设备只接收和发送它们关心的消息。这大大优化了数据传输,降低了网络流量。

    4. 可扩展性:MQTT可以连接数以百万计的设备,这使得它能够应对物联网设备数量大幅度增长的挑战。

    MQTT的应用场景非常广泛,涵盖了许多物联网领域
    在这里插入图片描述

    1. 智能家居:智能家居设备,如智能灯泡、智能插座,可以通过MQTT与其他设备通信,实现设备之间的互动。

    2. 工业物联网:在工业自动化,特别是远程设备监控和控制的应用中,MQTT可以实现设备的实时状态上报和远程控制。

    3. 车载系统:车载系统可以使用MQTT实现车辆状态的实时上报,以及远程控制车辆的各种功能。

    4. 环境监测:MQTT可以用于环境监测设备,实现实时数据上报,如气象监测,空气质量监测等。

    5. 能源管理:在智能电网中,MQTT可以用于实时监测和控制能源设备,如智能电表、太阳能板等。

    2. MQTT基础

    2.1. MQTT的定义与起源

    MQTT协议最早是由IBM在1999年为监控油管线而开发的,当时的主要目标是创建一个带宽利用率高、遥测传输量大、能在恶劣环境下工作的协议。由于其轻量级、开放源代码以及易于实现的特点,MQTT协议在物联网领域得到了广泛的应用,并逐渐成为了物联网通信的重要标准。

    在2013年,MQTT协议被提交给OASIS(开放标准推进协会)进行标准化,并于2014年正式成为OASIS标准。自此以后,MQTT协议得到了更广泛的认可和应用,成为了物联网领域中最重要的通信协议之一。

    2.2. MQTT的工作原理

    MQTT 的工作原理可以简要概括为以下几个步骤:

    在这里插入图片描述

    1. 连接建立:MQTT客户端通过TCP/IP或WebSocket与MQTT代理(Broker)建立连接。连接过程中,客户端和代理之间进行协议握手,包括发送CONNECT报文进行身份验证和协商连接参数。

    2. 订阅与发布:客户端可以发送SUBSCRIBE报文订阅感兴趣的主题(Topic),也可以发送PUBLISH报文发布消息到指定主题。订阅操作使客户端能够接收到感兴趣的主题下的消息,而发布操作则将消息传递给代理,由代理负责将消息发送给订阅了该主题的客户端。

    3. QoS级别和消息确认:MQTT支持不同的服务质量(Quality of Service,QoS)级别,决定了消息的传递保证和确认机制。QoS级别包括至多一次(QoS 0)、至少一次(QoS 1)和只有一次(QoS 2)。根据所选择的QoS级别,代理和客户端之间进行消息确认和重传,以确保消息的可靠传递。

    4. 保持活跃:在连接建立后,客户端和代理之间会维持心跳机制,以保持连接的活跃状态。客户端定期发送PINGREQ报文,代理会回复PINGRESP报文,确保连接持续有效。

    5. 断开连接:当客户端或代理需要断开连接时,可以发送DISCONNECT报文来终止连接。断开连接后,客户端不再接收或发布消息。

    2.3. MQTT的协议格式

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。它定义了客户端和代理服务器之间的通信格式。
    MQTT协议的基本格式:

    MQTT消息由固定头部(Fixed Header)和可变头部(Variable Header)组成,可选地包含有效载荷(Payload)。

    1. 固定头部(Fixed Header):

      • 第一个字节包含了消息类型和标志位。
      • 剩余长度字段指示了可变头部和有效载荷的长度。
    2. 可变头部(Variable Header):

      • 可变头部的内容根据消息类型而有所不同。
      • 例如,CONNECT消息的可变头部包含协议名称、协议级别、连接标志等。
    3. 有效载荷(Payload):

      • 有效载荷是可选的,根据消息类型的不同而有所变化。
      • 例如,PUBLISH消息的有效载荷包含要发布的消息内容。

    以下是MQTT协议中常见消息类型的格式示例:

    1. CONNECT消息:

      Fixed Header:
      0x10  // 消息类型:CONNECT
      Variable Header:
      Length // 可变头部长度
      Protocol Name // 协议名称
      Protocol Level // 协议级别
      Connect Flags // 连接标志
      Keep Alive // 保持连接时间
      Payload:
      Client Identifier // 客户端标识符
      Will Topic // 遗嘱消息主题
      Will Message // 遗嘱消息内容
      User Name // 用户名
      Password // 密码
      ```
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    2. PUBLISH消息:

      Fixed Header:
      0x30  // 消息类型:PUBLISH
      Variable Header:
      Length // 可变头部长度
      Topic Name // 主题名称
      Packet Identifier // 数据包标识符
      Payload:
      Application Message // 应用消息内容
      ```
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    3. SUBSCRIBE消息:

      Fixed Header:
      0x82  // 消息类型:SUBSCRIBE
      Variable Header:
      Length // 可变头部长度
      Packet Identifier // 数据包标识符
      Payload:
      Topic Filter // 订阅主题过滤器
      Requested QoS // 请求的服务质量等级
      ```
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    4. UNSUBSCRIBE消息:

      Fixed Header:
      0xA2  // 消息类型:UNSUBSCRIBE
      Variable Header:
      Length // 可变头部长度
      Packet Identifier // 数据包标识符
      Payload:
      Topic Filter // 订阅主题过滤器
      ```
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    5. PUBACK消息:

      Fixed Header:
      0x40  // 消息类型:PUBACK
      Variable Header:
      Length // 可变头部长度
      Packet Identifier // 数据包标识符
      ```
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

    2.4. 用java造个轮子

    虽然JAVA MQTT的实现 已经有很多sdk 如eclipse 实现的组件。org.eclipse.paho.client.mqttv3。为了方便理解,自己造一个简单的MQTT协议实现的轮子,试试。

    package com.icepip.project.mqtt.test;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.nio.charset.StandardCharsets;
    
    /**
     *  使用最原始的方式 进行MQTT连接和使用。
     *  仅作为学习验证使用。
     * @author 冰点
     * @version 1.0.0
     * @date 2023/9/9 13:19
     */
    
    public class MqttClientExample {
    
        private static final String BROKER = "172.20.6.xx"; // MQTT代理服务器的地址
        private static final int PORT = 1883;  
        private static final String USERNAME = "root"; // MQTT代理服务器的用户名
        private static final String PASSWORD = "123456"; // MQTT代理服务器的密码
    
        public static void main(String[] args) {
            try {
                // 创建Socket连接
                Socket socket = new Socket(BROKER, PORT);
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();
    
                // 发送CONNECT消息
                byte[] connectMessage = buildConnectMessage(USERNAME, PASSWORD);
                outputStream.write(connectMessage);
    
                // 接收CONNACK消息
                byte[] connackHeader = new byte[2];
                inputStream.read(connackHeader);
                byte connackMessageType = connackHeader[0];
                byte connackRemainingLength = connackHeader[1];
                byte[] connackPayload = new byte[connackRemainingLength];
                inputStream.read(connackPayload);
    
                if (connackMessageType == 0x20 && connackPayload[1] == 0x00) {
                    System.out.println("Connected to MQTT broker");
                } else {
                    System.out.println("Failed to connect to MQTT broker");
                }
    
                // 发送PUBLISH消息
                byte[] publishMessage = {0x30, 0x0D, 0x00, 0x09, 0x73, 0x65, 0x6E, 0x73, 0x6F, 0x72, 0x73, 0x2F, 0x74, 0x65, 0x73, 0x74};
                outputStream.write(publishMessage);
    
                // 接收PUBACK消息
                byte[] pubackHeader = new byte[4];
                inputStream.read(pubackHeader);
                byte pubackMessageType = pubackHeader[0];
                byte pubackRemainingLength = pubackHeader[1];
    
                if (pubackMessageType == 0x40 && pubackRemainingLength == 0x02) {
                    System.out.println("Message published successfully");
                } else {
                    System.out.println("Failed to publish message");
                }
    
                // 关闭连接
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static byte[] buildConnectMessage(String username, String password) {
            String clientId = "my_client";
    
            // 构建MQTT 5.0 Connect消息的固定报头
            byte[] fixedHeader = {(byte) 0x10}; // Connect消息类型
    
            // 构建MQTT 5.0 Connect消息的可变报头
            byte[] variableHeader = {
                    0x00, 0x04, 'M', 'Q', 'T', 'T', // Protocol Name
                    0x05, // Protocol Level (MQTT 5.0)
                    (byte) 0xC2, // Connect Flags (Username, Password, Clean Start)
                    0x00, 0x3C, // Keep Alive
                    0x00, // Properties Length
            };
    
            // 构建MQTT 5.0 Connect消息的负载
            byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
            byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
            byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
            byte[] payload = new byte[6 + clientIdBytes.length + usernameBytes.length + passwordBytes.length];
            int index = 0;
            payload[index++] = 0x00;
            payload[index++] = (byte) clientIdBytes.length;
            System.arraycopy(clientIdBytes, 0, payload, index, clientIdBytes.length);
            index += clientIdBytes.length;
            payload[index++] = 0x00;
            payload[index++] = (byte) usernameBytes.length;
            System.arraycopy(usernameBytes, 0, payload, index, usernameBytes.length);
            index += usernameBytes.length;
            payload[index++] = 0x00;
            payload[index++] = (byte) passwordBytes.length;
            System.arraycopy(passwordBytes, 0, payload, index, passwordBytes.length);
            index += passwordBytes.length;
    
            // 构建完整的MQTT 5.0 Connect消息
            byte[] connectMessage = new byte[1 + variableHeader.length + payload.length];
            connectMessage[0] = (byte) (variableHeader.length + payload.length); // Remaining Length
            System.arraycopy(variableHeader, 0, connectMessage, 1, variableHeader.length);
            System.arraycopy(payload, 0, connectMessage, 1 + variableHeader.length, payload.length);
    
            return connectMessage;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    3. 深入理解MQTT

    3.1. MQTT的主要组件

    MQTT主要由以下三个主要组件构成:
    在这里插入图片描述

    3.1.1. Publisher(发布者)

    如图中 传感器的网关就是发布者。会将终端传感器获取到的数据通过通信模块发布。发布者是MQTT通信中的发送消息的实体。它可以是任何具有MQTT客户端功能的设备、应用程序或服务。发布者负责创建并发送PUBLISH报文,将消息发布到一个或多个主题(Topic)。发布者将消息发送到MQTT代理(Broker),代理将负责将消息传递给订阅了相应主题的订阅者。

    3.1.2. Subscriber(订阅者)

    订阅者是MQTT通信中的接收消息的实体。类似于发布者,订阅者也可以是任何具有MQTT客户端功能的设备、应用程序或服务。订阅者通过发送SUBSCRIBE报文来订阅一个或多个主题,以表明它对这些主题下的消息感兴趣。一旦订阅成功,MQTT代理将向订阅者发送相应主题下的消息(通过PUBLISH报文)。订阅者可以根据自身需求对消息进行处理或响应。

    3.1.3. Broker(代理)

    如上图中MQTT代理服务器。一般可以使用开源的MQTT代理服务工具部署和优化使用,如Eclipse Mosquitto、HiveMQ、rabbitmq 添加MQTT插件也是可以作为MQTT代理服务。代理服务是MQTT通信中的中间件组件,负责转发消息并处理客户端的连接和订阅请求。代理接收来自发布者的PUBLISH报文,并将这些消息传递给订阅了相应主题的订阅者。它还负责维护客户端的连接状态、处理订阅和取消订阅的请求,并根据所选的QoS级别提供消息确认和重传等功能。

    MQTT代理充当了发布者和订阅者之间的中介,它管理主题的订阅关系,并确保消息的可靠传递。代理可以是一个独立的服务器或嵌入到其他应用程序中。在一个MQTT网络中,可以有多个代理存在,形成一个分布式的消息传递系统。
    从网上搜集和整理做了一个对比

    代理服务器开源MQTT版本支持主要特性优点:缺点
    MosquittoMQTT 3.1, 3.1.1, 5.0轻量级,高性能,完全符合MQTT协议规范占用资源少,适合在各种设备上运行功能相对简单,缺乏一些高级特性
    RabbitMQMQTT 3.1.1不仅支持MQTT,还支持多种消息协议,如AMQP,STOMP等功能强大,支持多种协议消息处理性能稍低,配置相对复杂
    HiveMQMQTT 3.1, 3.1.1, 5.0企业级别的特性,如集群,Websockets, SSL等支持大量并发连接,功能丰富,适合大型项目需要购买,价格较高
    EMQ XMQTT V5.0/V3.1/V3.1.1支持多种物联网协议,亿级别的物联网设备连接支持大规模设备连接,协议支持丰富对硬件资源要求较高
    VerneMQMQTT 3.1.1, 5.0高性能,分布式,可扩展在并发处理和容错方面表现优秀文档相对较少
    MoscaMQTT 3.1, 3.1.1, 5.0Node.js编写,适合嵌入其他Node.js应用简易部署,与Node.js应用集成方便功能相对有限,社区活跃度低

    我用docker 搭建了Mosquitto 代理服务,大家仅供学习参考
    《Spring Boot 集成MQTT代码示例》

    3.2. MQTT的主题(Topics)

    MQTT中的主题(Topic)是消息发布和订阅的中心概念。主题是一种逻辑上的命名机制,用于标识和分类不同类型的消息。发布者可以将消息发布到一个或多个主题,而订阅者可以订阅一个或多个主题以接收相应的消息。

    主题使用层次结构的命名方式,由多个层级组成,各层级之间使用斜杠(/)分隔。例如,sensors/temperature是一个主题,其中sensors是一级层级,temperature是二级层级。主题层级的数量没有限制,可以根据需要创建多层级的层次结构。

    MQTT的主题可以使用通配符进行更灵活的消息过滤和订阅控制:

    • 单层通配符(+):可以匹配一个层级,例如,sensors/+/value可以匹配sensors/temperature/valuesensors/humidity/value等主题。
    • 多层通配符(#):可以匹配零个或多个层级,但只能出现在主题的末尾,例如,sensors/#可以匹配sensors/temperaturesensors/humidity/value等主题。

    主题的设计需要根据具体的应用需求和数据结构进行合理规划,以便实现有效的消息传递和订阅管理。

    3.3. MQTT消息类型

    MQTT定义了不同类型的消息,用于实现发布-订阅模式的消息传递。以下是一些常见的MQTT消息类型:
    在这里插入图片描述

    3.3.1. Connect/Connack(连接/连接确认)

    Connect消息由客户端发送给代理服务器,用于建立与代理之间的连接。它包含连接请求的参数,如客户端ID、用户名、密码等。代理服务器会发送Connack消息作为响应,指示连接是否成功建立。

    3.3.2. Publish(发布)

    Publish消息用于将消息发布到指定的主题。它包含消息的内容和主题信息。发布消息由发布者发送给代理服务器,然后由代理服务器将消息传递给订阅了相应主题的订阅者。

    3.3.3. Subscribe/Suback(订阅/订阅确认)

    Subscribe消息由订阅者发送给代理服务器,用于订阅一个或多个主题。它指定了订阅者感兴趣的主题以及订阅的QoS级别。代理服务器发送Suback消息作为响应,指示订阅是否成功,并包含订阅的QoS级别信息。

    3.3.4. Unsubscribe/Unsuback(取消订阅/取消订阅确认)

    Unsubscribe消息由订阅者发送给代理服务器,用于取消订阅一个或多个主题。它指定要取消订阅的主题。代理服务器发送Unsuback消息作为响应,指示取消订阅是否成功。

    3.3.5. Pingreq/Pingresp(心跳请求/心跳响应)

    Pingreq消息由客户端发送给代理服务器,用于保持活动连接。它用于检测连接是否仍然有效。代理服务器发送Pingresp消息作为响应,表示连接仍然活动。

    3.3.6. Disconnect(断开连接)

    Disconnect消息由客户端发送给代理服务器,用于正常断开连接。它表示客户端希望关闭连接。

    这些消息类型组成了MQTT协议的基本消息集,通过这些消息,发布者可以发布消息,订阅者可以订阅感兴趣的主题,并进行消息的传递和交互。

    3.4. MQTT服务质量(QoS)级别

    MQTT定义了不同的服务质量级别(QoS)用于控制消息的传递可靠性。以下是MQTT支持的QoS级别:

    3.4.1. QoS 0: 至多一次

    QoS 0是最低级别的传递保证。在此级别下,消息会被发送一次,但没有确认机制,也不会进行重传。这意味着消息可能会发生丢失或重复。QoS 0适用于不要求消息可靠性的场景,如传递实时数据,或者对消息丢失或重复不敏感的应用。

    3.4.2. QoS 1: 至少一次

    QoS 1提供了消息传递的至少一次保证。在此级别下,消息会被发送并进行确认。如果发送方没有收到确认,它会重传消息,直到收到确认为止。这确保了消息至少会被传递一次,但可能会出现重复消息。QoS 1适用于要求至少一次传递的场景,如传递重要数据,确保消息到达目标。

    3.4.3. QoS 2: 只有一次

    QoS 2提供了消息传递的确保只有一次保证。在此级别下,消息会进行两阶段的握手过程,以确保消息只被传递一次。这种级别的传递保证是最高的,但同时也是最耗费资源的。QoS 2适用于要求确保只有一次传递的场景,如金融交易等关键应用。

    使用不同的QoS级别可以根据应用需求平衡消息传递的可靠性和开销的权衡。

    3.6. MQTT的 遗嘱消息

    MQTT的“遗嘱”消息(Last Will and Testament)是一种非常有用的特性,它让客户端在连接到MQTT服务器时发布一个预定义的消息,这个消息会在以下情况被服务器发送出去:

    1. 如果客户端的连接非正常断开,比如设备突然断电或者网络中断。
    2. 如果客户端在一段时间内没有发送任何数据(包括PING请求)以维持连接。

    这种“遗嘱”消息的主要用途是让其他客户端知道这台设备已经离线,这在很多物联网应用中非常有用,比如设备监控、故障检测等。

    举个例子,如果我们有一个温度传感器,它每5秒向服务器发送一次当前的温度数据。如果服务器在10秒内没有收到任何数据,那么可以判断这个传感器已经离线,此时服务器就会发布这个传感器设置的“遗嘱”消息,其他订阅了这个消息的客户端就会收到设备离线的通知。

    “遗嘱”消息在MQTT协议中设定比较简单,只要在建立连接时设置好“遗嘱”消息的主题和内容,以及消息的QoS等级和retain标志即可。一旦设备断开连接,MQTT服务器就会自动发送出“遗嘱”消息。

    3.6. MQTT的保持连接和断开连接(Keep Alive & Clean Session)

    保持连接(Keep Alive)是MQTT协议中的一个机制,用于确保客户端与代理服务器之间的连接处于活动状态。客户端在连接建立后会发送一个心跳请求(Pingreq)给代理服务器,代理服务器则会响应一个心跳响应(Pingresp)。通过定期的心跳交换,可以检测连接是否仍然有效,避免连接因为长时间的空闲而被关闭。

    另外,MQTT还使用"Clean Session"标志来管理客户端与代理服务器之间的连接状态。当客户端连接到代理服务器时,可以设置"Clean Session"标志为true或false。如果设置为true,表示建立一个新的会话,之前的会话状态和订阅信息都会被清除。如果设置为false,表示恢复之前的会话,保留之前的订阅信息和状态。

    保持连接和"Clean Session"机制使得MQTT可以实现持久的连接和断线重连,同时还能保持会话状态和订阅信息。

    3.7. MQTT的安全性:SSL/TLS

    MQTT提供了通过SSL/TLS协议进行加密和身份验证的安全传输机制。SSL/TLS(Secure Sockets Layer/Transport Layer Security)是一种广泛使用的加密协议,用于保护网络通信的安全性。

    通过在MQTT连接上使用SSL/TLS,可以实现以下安全性特性:

    • 数据加密:通过SSL/TLS协议,可以对传输的MQTT消息进行加密,确保数据在传输过程中的机密性。
    • 身份验证:SSL/TLS协议还允许客户端和服务器进行身份验证,确保通信双方的身份合法和可信。
    • 抵御中间人攻击:SSL/TLS协议提供了抵御中间人攻击的机制,确保通信双方之间的通信不被窃听、篡改或伪装。

    要使用SSL/TLS进行MQTT安全传输,需要在代理服务器上配置和启用SSL/TLS支持,并配置证书、私钥等安全相关的参数。客户端需要使用支持SSL/TLS的MQTT库,并配置正确的证书和验证方式来建立安全连接。

    通过SSL/TLS的加密和身份验证机制,MQTT可以提供更高级别的安全性,适用于对数据保密性和完整性要求较高的应用场景,如物联网设备通信、金融交易等。

    4.1. 如何设置和使用MQTT

    要设置和使用MQTT,需要进行以下步骤:

    1. 安装MQTT代理服务器:首先需要选择一个MQTT代理服务器,例如Eclipse Mosquitto、ActiveMQ、HiveMQ等。根据服务器的安装指南,将代理服务器安装在你的服务器或本地计算机上。

    2. 配置MQTT代理服务器:根据代理服务器的文档和配置指南,进行服务器的配置。这包括设置监听端口、启用SSL/TLS、配置用户认证等。

    3. 选择MQTT客户端库:根据你的编程语言和平台选择适合的MQTT客户端库,例如Paho MQTT库、MQTT.js、MQTTnet等。这些库提供了MQTT协议的实现,可以用于开发MQTT客户端应用程序。

    4. 编写MQTT客户端应用程序:使用选择的MQTT客户端库,编写你的MQTT客户端应用程序。这包括连接到MQTT代理服务器、发布消息、订阅主题和处理接收到的消息等功能。

    5. 运行和测试:运行你的MQTT客户端应用程序,并进行测试。确保你的客户端能够正确连接到代理服务器,并能够发布和接收消息。

    设置和使用MQTT需要一定的技术知识和编程经验。确保你熟悉所选择的MQTT代理服务器和客户端库的文档,并按照指南进行配置和开发。

    4.2. MQTT客户端和服务器的示例

    使用Java和Paho MQTT库创建一个MQTT客户端
    创建了一个名为my_client的MQTT客户端,连接到127.0.0.1代理服务器的端口1883。客户端设置了连接回调函数,处理连接丢失、接收到消息和消息发布完成等事件。然后客户端连接到MQTT代理服务器,订阅sensors/temperature主题,并发布一个值为25的消息到该主题。最后,客户端进入循环以持续处理消息。
    确保将mqtt.example.com替换为实际的MQTT代理服务器地址,并根据需要修改客户端ID、主题和消息内容等信息。

    <dependencies>
        <dependency>
            <groupId>org.eclipse.pahogroupId>
            <artifactId>org.eclipse.paho.client.mqttv3artifactId>
            <version>1.2.5version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    import org.eclipse.paho.client.mqttv3.*;
    
    public class MqttClientExample {
    
        private static final String BROKER = "tcp://127.0.0.1:1883"; // MQTT代理服务器的地址和端口
        private static final String CLIENT_ID = "my_client"; // 客户端ID
    
        public static void main(String[] args) {
            try {
                MqttClient client = new MqttClient(BROKER, CLIENT_ID);
                MqttConnectOptions options = new MqttConnectOptions();
    
                // 连接回调函数
                client.setCallback(new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable throwable) {
                        System.out.println("Connection lost");
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                        System.out.println("Received message: " + new String(mqttMessage.getPayload()));
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        // 消息发布完成
                    }
                });
    
                // 连接到MQTT代理服务器
                client.connect(options);
    
                // 订阅主题
                String topic = "sensors/temperature";
                client.subscribe(topic);
    
                // 发布消息到主题
                String message = "25";
                client.publish(topic, message.getBytes(), 0, false);
    
                // 持续运行
                while (true) {
                    // 处理消息
                    client.loop();
                }
    
                // 断开连接
                client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    4.3. MQTT在物联网项目中的应用案例分析

    实际上MQTT在各个领域的物联网应用中都发挥着重要作用。其轻量级、可靠性和灵活性使得MQTT成为物联网通信的理想选择。具体的应用案例可以根据实际需求和场景进行定制和扩展。

    1. 传感器数据收集和监控:物联网中的传感器通常会采集环境数据,如温度、湿度、光照等。MQTT可以用于将传感器数据发布到相应的主题,供其他设备或应用程序进行监控和分析。

    2. 远程设备控制:通过MQTT,可以远程控制物联网设备的行为。例如,通过发布控制指令到相应主题,可以控制智能家居设备的开关、调节灯光亮度等。

    3. 能源管理和优化:MQTT可以用于实时监测能源消耗和优化。通过将能源计量设备与MQTT集成,可以实时收集能源数据并将其发布到相应的主题。这样,能源管理系统可以订阅这些主题并进行能源消耗分析,以便进行优化和节约能源。

    4. 车联网:MQTT在车联网中也得到广泛应用。例如,车辆可以通过MQTT发布其位置信息,供其他车辆或应用程序进行实时跟踪和导航。同时,车辆也可以订阅相关主题以接收交通状况、路况等信息。

    5. 物流和供应链管理:MQTT可以用于物流和供应链管理,以实现实时跟踪和监控。通过将物流设备、仓库和运输工具与MQTT集成,可以实时发布物流信息、货物跟踪信息等,以提高物流效率和可视化管理。

    5. 常见问题

    从网上搜集了一些问题和一些MQTT的面试题做了解答。巩固一下学习成果。

    1. 什么是MQTT以及它的主要应用场景?
      MQTT即Message Queuing Telemetry Transport,是一种基于发布/订阅模式的轻量级通信协议。其主要应用在物联网领域,包括智能家居、工业自动化、车载系统、环境监测和能源管理等。

    在一个智能工厂中,设备通过MQTT协议,发布其运行状态(例如温度、速度等)到相应的主题,而监控中心通过订阅这些主题以实时获取设备状态。在这个应用场景中,MQTT提供的发布/订阅模式恰好满足了智能工厂对实时数据传输的需求。

    1. MQTT的工作原理及其发布/订阅系统。
      MQTT的工作原理主要是基于发布/订阅模式。发布者发布消息到MQTT代理(Broker),每个订阅了相关主题的订阅者都会接收到这个消息。这样,就实现了设备间的通信。

    假设一个智能家居系统中,空调设备可以发布自己的温度信息到"home/ac/temperature"这个主题,而家庭主人的手机APP订阅了这个主题,那么当空调设备的温度发生变化时,家庭主人的手机APP就会收到这个消息。

    1. MQTT主题(Topics)?
      主题是MQTT中的一个重要概念,它本质上是一个字符串,用于区分不同的消息内容。仅当订阅者订阅了某主题,才能接收到发布到该主题的消息。

    在一个农业灌溉系统中,每一个传感器设备可以拥有一个特定的主题,如"farm/sensor/1/moisture",用于发布该传感器检测到的土壤湿度信息。

    1. MQTT的QoS有哪些级别,分别表示什么?
      MQTT的QoS(Quality of Service)有三个等级:0、1、2。QoS 0表示“最多交付一次”,即消息可能丢失或重复;QoS 1表示“至少交付一次”,消息不会丢失但可能重复;QoS 2表示“只交付一次”,即消息不会丢失也不会重复。

    在一个火警报警系统中,火警传感器发现火警后,会向"firealarm/sensor/1"这个主题发布警报。由于这是一个紧急情况,我们需要确保消息一定能发送到,所以我们可以选择QoS 2,即“只交付一次”。

    1. MQTT的保活(Keep Alive)机制和清除会话(Clean Session)标志。
      保活是MQTT协议中的一个定时器,用于检测客户端是否在线。清除会话标志用于指定在连接断开后,是否保留客户端的会话信息。

    假设一个物联网设备由于网络问题,从MQTT代理断开。如果设置了保活时间,当超过这个时间,MQTT代理没有收到设备的消息,就会认为设备离线。如果之前设定了清除会话标志,设备再次连接时,MQTT代理不会继续之前的会话,而是创建一个新的会话。

    1. MQTT如何保证数据的安全传输?
      MQTT可以通过SSL/TLS进行数据加密,以保证数据的安全传输。此外,也可以通过用户名和密码进行客户端连接的认证。

    在一个医疗设备监控系统中,医疗设备的数据通常包含病人的敏感信息,我们需要保证这些信息不被第三方窃取。在这种情况下,我们可以使用MQTT的SSL/TLS加密传输方式,为数据提供安全保护。

    1. MQTT与HTTP的主要区别是什么?为什么在物联网应用中MQTT比HTTP更受欢迎?
      MQTT与HTTP的主要区别在于它们的设计目标和使用场景。HTTP是一种基于请求/响应模式的无状态协议,主要用于Web服务;而MQTT是一种基于发布/订阅模式的轻量级协议,特别适合在网络条件不佳的物联网环境中使用。因此,在物联网应用中,MQTT因其轻量级和高效的特点,往往比HTTP更受欢迎。

    在一个车载系统的应用中,例如,车辆通过MQTT发布自身的位置信息,而监控中心订阅相关主题获得车辆位置。在这种应用环境下,由于车载系统的网络环境常常变化,使用HTTP可能会造成大量的连接开销和延迟,而使用MQTT则可以持续保持长连接,实时地获取或推送消息。

    1. 举例说明如何在物联网项目中使用MQTT。
      在智能家居项目中,例如,智能灯具可以作为MQTT的发布者,发布其状态(开或关)到某个主题;用户的手机APP作为订阅者,订阅该主题,当灯具的状态改变时,可以及时接收到通知。

    在一个智能农业系统中,农场内的传感设备(如温湿度传感器、光照传感器等)可以通过MQTT发布其采集到的环境数据到主题上;农场管理者的终端设备订阅相关主题,便可以实时获取地块的各类环境信息,有助于农作物的精确管理。

    1. 如何在大规模的物联网网络中优化MQTT的性能?

    优化MQTT的性能可以从多个方向来考虑。首先,在大规模设备接入时,我们需要尽可能保持连接、订阅、主题和消息的简洁,来减少内存和网络带宽的消耗。其次,我们可以通过选择高效的主题策略和合理的QoS等级,来平衡消息的传输可靠性和网络带宽的使用。最后,我们还可以通过引入分布式MQTT代理和负载均衡技术,来提高系统的扩展性和稳定性。

    举例来说,如果我们正在管理一个大型的智能电网系统,系统中有数十万个智能电表需要通过MQTT发布电量信息。为了优化性能,我们可以为每个电表分配一个专属主题,如"smartgrid/meter/ID",其中ID是每个电表的唯一标识符。这样,每个电表的电量信息就可以发布到各自的主题上,避免在单一主题上产生大量的消息流量。同时,我们还可以设置适当的QoS等级,如QoS 1,来确保消息的可靠传输,同时控制带宽的使用。

    1. 描述一下在使用MQTT过程中遇到的最大的问题,以及你是如何解决的?

    在一个项目中,我遇到了一个设备频繁掉线重连的问题。通过分析,我发现这是由于设备和MQTT代理之间的网络连接不稳定所导致的。为了解决这个问题,我调整了MQTT的保活参数,并且在设备端引入了连接检测和自动重连的机制,成功解决了这个问题。

    在一个智能物流系统中,我曾遇到过MQTT设备频繁掉线的问题。经过排查,发现是由于网络环境不稳定导致的。为了解决这个问题,我调整了MQTT的保活时间参数,并在设备端增加了检查连接状态和自动重连的机制。这样即使网络环境不稳定,设备也能在掉线后自动重新连接到MQTT代理。

    1. 如果需要在MQTT协议上添加新的功能或进行改进,你有什么建议?

    假如在一个远程医疗监控系统中,需要将患者的生理数据在断网情况下进行存储,并在网络恢复后发送到医院的服务器。当前的MQTT协议并未直接支持这种需求。因此,我建议在MQTT协议中增加一种QoS 3等级,用于支持设备离线时的消息存储和重传,这将大大提高物联网设备的消息接收可靠性。

    1. 讲述一下MQTT在不同的通信环境下(如低带宽、不稳定连接)的表现,以及如何优化?

    在低带宽环境中,MQTT表现优秀,因为它的协议设计非常轻量级,并且可以通过调整QoS等级来限制带宽使用。在不稳定的网络连接中,我们可以调整保活参数,或者在客户端实现自动重连的功能,来提高MQTT的连接稳定性。

    在一个偏远地区的农业物联网系统中,由于网络环境较差,带宽低且连接不稳定,我们可以选择MQTT的QoS 0级别,以最小化带宽使用。同时,我们可以在设备端实现自动重连的功能,并适当增大保活时间,以提高MQTT的连接稳定性。

    1. 如何确保MQTT在大规模设备接入的情况下的稳定性和安全性?

    为了确保MQTT在大规模设备接入的情况下的稳定性和安全性,我们可以采用物理隔离和逻辑隔离并行使用的方式。物理隔离可以通过部署多个MQTT代理来实现;逻辑隔离可以通过使每个设备具有唯一的访问权限和专属的主题来实现。

    在一个智慧城市项目中,需要管理数以百万计的智能设备。为了保持MQTT的稳定性和安全性,我们可以部署多个MQTT代理服务器,并使用负载均衡技术来分配设备连接。同时,为了确保安全性,我们可以为每个设备配置唯一的访问权限和专属的主题,并使用SSL/TLS进行加密。

    1. 谈谈你对MQTT 5.0版本的理解,它带来了哪些新特性,对物联网有何影响?

    MQTT 5.0带来了一些新特性,如增强的认证机制、消息属性和共享订阅等。这些新特性将使MQTT能够满足更复杂、更高级的物联网应用需求。同时,MQTT 5.0的出现也显示了该协议对物联网通信标准的重要性和它在物联网领域的广泛应用。

    以智能家居为例,MQTT 5.0的新特性——共享订阅,可以使得家中的多台设备(如手机、平板、电视等)订阅同一主题并共享消息,这无疑为多设备协同工作带来了便利。此外,增强的认证机制提升了设备与服务器之间通信的安全性,满足了当前物联网领域对于安全性的严格需求。

    6.参考资料

    本文学习和借鉴了一些MQTT 官方文档

    1. MQTT.org: MQTT协议的官方网站,提供了许多有关MQTT基础知识以及高级使用的信息。

    2. 官方MQTT 5.0规格文档, MQTT Version 5.0 - Official Specification

    3. Eclipse Paho MQTT Documentation:Eclipse的Paho项目提供了一系列的MQTT客户端库,并附有文档。

    4. Mosquitto Documentation

    5. https://www.hivemq.com/blog/mqtt-essentials-part-1-introducing-mqtt/

    在这里插入图片描述

  • 相关阅读:
    Hexo + Gitee安装部署(打造你的个人博客)
    11个程序员必备简捷开发辅助工具
    ARouter There is no route match the path 原因
    Python 快速排序
    CENTURY模型应用
    Zabbix预处理和数据开源节流
    Camtasia 2022全新版超清录制电脑视频
    电子设备行业智能供应链系统:打破传统供应链壁垒,提升电子设备企业管理效能
    Docker数据卷
    线性方程求解算法(Java实现)
  • 原文地址:https://blog.csdn.net/wangshuai6707/article/details/132767241