• Python与mqtt的数据读取


    mqtt协议介绍

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

    MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

    MQTT 是一个 发布订阅线路协议(publish/subscription wire protocol),发布订阅系统工作原理类似于消息总线。你将一条消息发布到一个主题(topic)上,那么所有订阅了该主题的客户端都可以获得该消息的一份拷贝。它提供了一对多的消息分发机制,从而实现与应用程序的解耦。这是一种消息传递模式,消息不是直接从发送器发送到接收器(即点对点),而是由MQTT server(或称为 MQTT Broker)分发的。

    对于Broker(MQTT服务器)来说,不论我们是发布方,还是订阅方,都是属于客户端。

    与HTTP的区别

    HTTP 协议是一个无状态的协议,每个 HTTP 请求为 TCP 短连接,每次请求都需要重新创建一个 TCP 连接(可以通过 keep-alive 属性来优化 TCP 连接的使用,多个 HTTP 请求可以共享该 TCP 连接);而 MQTT 协议为长连接协议,每个客户端都会保持一个长连接。与 HTTP 协议相比优势在于:

    MQTT 的长连接可以用于实现从设备端到服务器端的消息传送之外,还可以实现从服务器端到设备端的实时控制消息发送,而 HTTP 协议要实现此功能只能通过轮询的方式,效率相对来说比较低;

    MQTT 协议在维护连接的时候会发送心跳包,因此协议以最小代价内置支持设备 “探活” 的功能,而 HTTP 协议要实现此功能的话需要单独发出 HTTP 请求,实现的代价会更高;

    低带宽、低功耗。MQTT 在传输报文的大小上与 HTTP 相比有巨大的优势,因为 MQTT 协议在连接建立之后,由于避免了建立连接所需要的额外的资源消耗,发送实际数据的时候报文传输所需带宽与 HTTP 相比有很大的优势,参考网上有人做的测评,发送一样大小的数据,MQTT 比 HTTP 少近 50 倍的网络传输数据,而且速度快了将近 20 倍。在网上有人做的另外一个评测显示,接收消息的场景,MQTT 协议的耗电量为 HTTP 协议的百分之一,而发送数据的时候 MQTT 协议的耗电量为 HTTP 协议的十分之一;

    MQTT 提供消息质量控制(QoS),消息质量等级越高,消息交付的质量就越有保障,在物联网的应用场景下,用户可以根据不同的使用场景来设定不同的消息质量等级;

    paho-mqtt包

    介绍之前需要说明,回调函数是这个包的特点之一,可以通过自己定义回调函数,决定返回的信息的内容、格式或者处理方法等。因此使用该包连接、发布、订阅等操作之前,需要定义并引用回调函数。

    1. 函数、回调简单总结:
    函数功能callbackcallback内容
    client()创建实例
    connect()客户端连接brokeron_connect()broker回应连接请求
    disconnect()客户端取消连接brokeron_disconnect()broker回应不连接请求
    publish()客户端发布:将消息发送至broker,然后从broker发送到订阅匹配主题的任何客户端on_publish()当要使用 publish()调用发送的消息已完成向broker的传输时
    publish.single()客户端发布单条信息,然后断开连接
    publish.multiple()客户端发布多条信息,然后断开连接
    subscribe()客户端订阅on_subscribe()当broker响应订阅请求时
    subscribe.simple()客户端订阅一组topics并返回收到的消息
    on_message()默认返回订阅消息
    message_callback_add()定义并处理特定订阅筛选器(包括通配符)的传入消息的callback
    message_callback_remove()移除特定订阅筛选器的callback
    unsubscribe()客户端取消订阅on_unsubscribe()当broker响应取消订阅请求时
    1. 变量介绍
    • 如果使用 message_callback_add()和 on_message,则只有与前者中定义的订阅筛选器不匹配的消息才会传递到on_message回调。
    • message_callback_add()只能一条一条的添加

    参考

    发布

    import paho.mqtt.client as mqtt
    
    # 连接成功回调
    def on_connect(client, userdata, flags, rc):
        print('Connected with result code '+str(rc))
        client.subscribe('testtopic/#')
    
    # 消息接收回调
    def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))
    
    client = mqtt.Client()
    
    # 指定回调函数
    client.on_connect = on_connect
    client.on_message = on_message
    
    # 建立连接
    client.connect('broker.emqx.io', 1883, 60)
    # 发布消息
    client.publish('emqtt',payload='Hello World',qos=0)
    
    client.loop_forever()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    订阅

    import paho.mqtt.client as mqtt
    
    # 连接成功回调
    def on_connect(client, userdata, flags, rc):
        print("Connected with result code "+str(rc))
    
    # 消息接收回调,msg就是接收到的消息内容
    def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))
    
    # 创建实例
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    
    # 将客户端连接到broker,60为间隔
    client.connect("mqtt.eclipseprojects.io", 1883, 60)
    client.subscribe('mqtt_topic', qos=0)
    
    # 保持连接不中断
    client.loop_forever()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    关于连接

    以上示例中,我们均是通过1883端口进行连接并订阅/发布主题,但是对于ssl监听端口,如默认的8883端口,连接的代码则需要部分修改:

    client = mqtt.Client('broker.emqx.io', 8883)
    context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    context.check_hostname = False
    client.tls_set_context(context)
    client.connect()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    kafka

    kafka-高产出的分布式消息系统(A high-throughput distributed messaging system)。
    Kafka是一个高吞吐、分布式、基于发布订阅的消息系统,利用Kafka技术可以在廉价的PC Server上搭建起大规模消息系统。
    (和上面的MQTT很类似,只不过发布端变成了生产者, 订阅端变成了消费者,都通过代理进行消息的传输,相应的消息都在对应topic中)

    kafka-python包

    kafka-python中有以下几个重要的对象:

    • Topic:主题,一组消息数据的标记符;
    • Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
    • Consumer:消费者,获取数据,可消费指定的Topic;
    • Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
    • Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。
    1. 生产者实例参考
    import json
    from kafka import KafkaConsumer, KafkaProducer
     
     
    class KProducer:
        def __init__(self, bootstrap_servers, topic):
            """
            kafka 生产者
            :param bootstrap_servers: 地址
            :param topic:  topic
            """
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda m: json.dumps(m).encode('ascii'), )  # json 格式化发送的内容
            self.topic = topic
     
        def sync_producer(self, data_li: list):
            """
            同步发送 数据
            :param data_li:  发送数据
            :return:
            """
            for data in data_li:
                future = self.producer.send(self.topic, data)
                record_metadata = future.get(timeout=10)  # 同步确认消费
                partition = record_metadata.partition  # 数据所在的分区
                offset = record_metadata.offset  # 数据所在分区的位置
                print('save success, partition: {}, offset: {}'.format(partition, offset))
     
        def asyn_producer(self,  data_li: list):
            """
            异步发送数据
            :param data_li:发送数据
            :return:
            """
            for data in data_li:
                self.producer.send(self.topic, data)
            self.producer.flush()  # 批量提交
     
        def asyn_producer_callback(self,  data_li: list):
            """
            异步发送数据 + 发送状态处理
            :param data_li:发送数据
            :return:
            """
            for data in data_li:
                self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)
            self.producer.flush()  # 批量提交
     
        def send_success(self, *args, **kwargs):
            """异步发送成功回调函数"""
            print('save success')
            return
     
        def send_error(self, *args, **kwargs):
            """异步发送错误回调函数"""
            print('save error')
            return
     
        def close_producer(self):
            try:
                self.producer.close()
            except:
                pass
     
    if __name__ == '__main__':
     
        send_data_li = [{"test": 1}, {"test": 2}]
        kp = KProducer(topic='topic', bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')
     
        # 同步发送(最慢,安全性最高)
        kp.sync_producer(send_data_li)
     
        # 异步发送(最快,安全性最低)
        # kp.asyn_producer(send_data_li)
     
        # 异步+回调(中等)
        # kp.asyn_producer_callback(send_data_li)
        
        kp.close_producer()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    1. 消费者实例
      暂略
  • 相关阅读:
    springboot+nodejs+vue工程师售后服务评价管理系统
    KEITHLEY 2182A + 6220/6221 系列测试系统软件
    15.springmvc源码解读之手写springmvc(简易版本)
    ardupilot开发 --- External LEDs篇
    使用Java实现哈夫曼编码
    从B-21轰炸机看美空军作战战略趋势
    数据可视化时代,为智慧城市建设添彩_光点科技
    程序调试技巧
    如何使用VSCode来查看二进制文件
    SQL刷题查漏补缺7
  • 原文地址:https://blog.csdn.net/cigarrrr/article/details/132804770