• Python操作Kafka基础教程


    01 Python操作Kafka基础教程

    创建ZooKeeper容器

    docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
    
    • 1

    创建Kafka容器

    语法是:

    docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=[你的IP地址]:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[你的IP地址]:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
    
    • 1

    我的虚拟机IP是192.168.31.86,所以我的命令是:

    docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.86:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.86:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
    
    • 1

    安装可视化工具

    下载UI工具:https://kafkatool.com/download2/offsetexplorer_64bit.exe

    下载好以后按照默认进行安装。

    在这里插入图片描述

    在这里插入图片描述

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    在这里插入图片描述

    在这里插入图片描述

    连接Kafka

    搜索软件并打开:

    在这里插入图片描述

    在这里插入图片描述

    配置zookeeper:

    在这里插入图片描述

    配置Kafka:

    在这里插入图片描述

    点击Test测试按钮,测试是否能够连接Kafka:

    在这里插入图片描述

    点击是,然后就成功的使用客户端连接上Kafka了。

    在这里插入图片描述

    安装依赖

    安装Python3.8

    安装:

    pip install kafka-python==2.0.2
    
    • 1

    发布和消费json数据

    生产者

    from kafka import KafkaProducer
    import json
    
    # 创建生产者
    producer = KafkaProducer(
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092']
    )
    
    # 要提交的消息
    msg_dict = {
        "operatorId": "test",  # 公交公司ID
        "terminalId": "123",  # 设备Id
        "terminalCode": "123",  # 设备编码(使用车辆ID)
        "terminalNo": "1",  # 同一车辆内terminal序号从1开始
    }
    
    # 向指定的主题发送消息
    producer.send("text1", msg_dict)
    producer.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    for msg in consumer:
        print(msg.value.decode())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    发布和消费文本数据

    生产者

    from kafka import KafkaProducer
    
    # 创建生产者
    producer = KafkaProducer(
        value_serializer=lambda v: v.encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092']
    )
    
    # 向指定的主题发送消息
    producer.send("text1", "你好")
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    for msg in consumer:
        print(msg.value.decode())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    发布和消费键值对文本数据

    生产者

    from kafka import KafkaProducer
    
    # 创建生产者
    producer = KafkaProducer(
        key_serializer=lambda v: v.encode('utf-8'),
        value_serializer=lambda v: v.encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092']
    )
    
    # 向指定的主题发送消息
    producer.send("text1", key="msg", value="你好")
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    for msg in consumer:
        print("key=", msg.key.decode())
        print("value=", msg.value.decode())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    发布和消费键值对JSON数据

    生产者

    from kafka import KafkaProducer
    import json
    
    # 创建生产者
    producer = KafkaProducer(
        key_serializer=lambda v: json.dumps(v).encode('utf-8'),
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092']
    )
    
    # 向指定的主题发送消息
    key = {"a": 1}
    value = {"b": 2}
    producer.send("text1", key=key, value=value)
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    for msg in consumer:
        print("key=", msg.key.decode())
        print("value=", msg.value.decode())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    发布和消费压缩文本数据

    生产者

    from kafka import KafkaProducer
    
    # 创建生产者
    producer = KafkaProducer(
        value_serializer=lambda v: v.encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092'],
        compression_type='gzip',  # 通过此参数声明要压缩数据传输
    )
    
    # 向指定的主题发送消息
    producer.send("text1", "你好")
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    for msg in consumer:
        print(msg.value.decode())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    同时消费多个主题

    生产者

    from kafka import KafkaProducer
    
    # 创建生产者
    producer = KafkaProducer(
        value_serializer=lambda v: v.encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092']
    )
    
    # 向指定的主题发送消息
    producer.send("text1", "你好")
    producer.send("text2", "你好")
    
    producer.send("text1", "你好1")
    producer.send("text2", "你好1")
    
    producer.send("text1", "你好2")
    producer.send("text2", "你好2")
    
    
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    consumer.subscribe(["text1", "text2"])
    for msg in consumer:
        print(msg)
        print(msg.topic)
        print(msg.value.decode())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    获取发布结果

    生产者

    from kafka import KafkaProducer
    
    # 创建生产者
    producer = KafkaProducer(
        value_serializer=lambda v: v.encode('utf-8'),
        bootstrap_servers=['127.0.0.1:9092']
    )
    
    # 向指定的主题发送消息
    feature = producer.send("text1", "你好")
    
    # 会阻塞,直到发送成功
    print(feature.get(timeout=60))
    
    
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    消费者

    from kafka import KafkaConsumer
    
    # 创建消费者
    consumer = KafkaConsumer("text1", bootstrap_servers='127.0.0.1:9092')
    
    # 不停的消费数据
    for msg in consumer:
        print(msg.topic)
        print(msg.value.decode())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    【前端基础小案例】HTML+CSS打造精美选项卡菜单效果
    数据挖掘实战(4)——聚类(Kmeans、MiniBatchKmeans、DBSCAN、AgglomerativeClustering、MeanShift)
    等保2.0测评手册之Mysql
    Java开发一些偏冷门的面试题
    C# 高级 <一>
    【Python】基础数据结构:列表——元组——字典——集合
    Hadoop HA高可用集群模式搭建指南
    解决PyTorch中的RuntimeError: CUDA error: device-side assert triggered
    【C语言】多进程服务器
    Flutter的路由router-页面跳转
  • 原文地址:https://blog.csdn.net/qq_37703224/article/details/136164690