• kafka生产者和消费者(python版)


    生产者

    producer = KafkaProducer(bootstrap_servers=[”ip:port“])
    producer.bootstrap_connected()
    producer.send(self.topic_name_send,str.encode(json.dumps(message))).get()
    producer.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消费者

    消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题,

    consumer = KafkaConsumer(bootstrap_servers=["ip:port"], group_id="组名")
    tp = TopicPartition("主题名", 0)
    consumer.assign([tp])
    consumer.position(tp)
    # 修改用户组的偏移量为最新的
    # consumer.seek_to_beginning()
    last_offset = consumer.end_offsets([tp])[tp]
    data_list = []
    for msg in consumer:
        # print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
        # 对数据进行解析为dict
        res = json.loads(msg.value)
        data_list.append(res)
        # 用于读取数据结束的退出
        if msg.offset == last_offset - 1:
            break
    consumer.commit()
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    kafka工具类

    此工具类基本上拿过去就可以用

    import datetime
    import json
    import time
    
    from kafka import KafkaProducer, KafkaConsumer, TopicPartition
    from env import enviroments,ENV
    import logging
    
    
    class KafkaHelper(object):
        def __init__(self, host=enviroments[ENV]['kafka']["host"], topic_send=enviroments[ENV]['kafka']['kafka_topic_send'],
                     topic_receive=enviroments[ENV]['kafka']['kafka_topic_receive'],
                     group_id = "python_test"):
            self.topic_name_send = str(topic_send)
            self.topic_name_receive = str(topic_receive)
            self.host = host
            self.group_id = group_id
    
    
        def send_msg(self, topic="test", msg="默认测试数据"):
            try:
                producer = KafkaProducer(bootstrap_servers=[self.host])
                producer.bootstrap_connected()
            except Exception as e:
                logging.error(f"发送消息时链接kafka失败,突出消息发送,失败原因:{e}")
                return
            message = {
                "topic": topic,
                "from": "python",
                "time": str(datetime.datetime.utcnow()),
                "data": msg
            }
            try:
                producer.send(self.topic_name_send,str.encode(json.dumps(message))).get()
            except Exception as e:
                logging.error(f"kafka发送数据失败,要发送的数据:{message},失败原因:{e}")
            finally:
                producer.close()
    
        # 消费者链接可能存在每次消费一个的情况,需要不断的创建和销毁消费者
        def get_one_data(self) -> list:
            try:
                consumer = KafkaConsumer(bootstrap_servers=[self.host], group_id=self.group_id)
                tp = TopicPartition(self.topic_name_receive, 0)
                consumer.assign([tp])
                consumer.position(tp)
            except Exception as e:
                logging.error(f"读取消息时链接kafka失败,突出消息发送,失败原因:{e}")
                return
            data_list = []
            try:
                for msg in consumer:
                    # print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
                    # 对数据进行解析为dict
                    res = json.loads(msg.value)
                    data_list.append(res)
                    consumer.commit()
                    consumer.close()
            except Exception as e:
                logging.error(f"读取kafka单条失败,失败原因:{e}")
            finally:
                consumer.close()
                return data_list
    
        def receive_msg(self)->list:
            try:
                consumer = KafkaConsumer(bootstrap_servers=[self.host], group_id=self.group_id)
                tp = TopicPartition(self.topic_name_receive, 0)
                consumer.assign([tp])
                consumer.position(tp)
                # now_offset = consumer.offsets_for_times(timestamps=int(time.time()*1000000))
                # 修改用户组的偏移量为最新的
                # consumer.seek_to_beginning()
    
                print(consumer.config.keys())
                last_offset = consumer.end_offsets([tp])[tp]
                print(last_offset)
            except Exception as e:
                logging.error(f"读取消息时链接kafka失败,突出消息发送,失败原因:{e}")
                return
    
            data_list = []
            try:
                for msg in consumer:
                    # print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
                    # 对数据进行解析为dict
                    res = json.loads(msg.value)
                    data_list.append(res)
                    if msg.offset == last_offset - 1:
                        break
                consumer.commit()
            except Exception as e:
                logging.error(f"读取kafka单条失败,失败原因:{e}")
            finally:
                consumer.close()
                return data_list
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    疑问

    1. 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    本人小白一枚,有什么不对的地方欢迎大家指出,也可以加q一起讨论技术哦(不要嫌弃我菜) 1147528161

  • 相关阅读:
    在线客服系统源码开发实战总结:需求分析及前端代码基本技术方案
    易基因: Nature Biotech:番茄细菌性青枯病的噬菌体联合治疗|国人佳作
    Scrum和Kanban方法的结合:Scrumban的实施指南
    ES相关学习
    英文论文(sci)解读复现【NO.12】YOLO-Tea: YOLOv5改进的茶叶病害检测模型
    web自动化测试入门篇02——selenium安装教程
    Java8新特性你知道哪些?
    web前端期末大作业 ——电影主题介绍 你好,李焕英 ——html+css+javascript网页设计实例
    2022 极术通讯-基于安谋科技 “星辰” STAR-MC1的灵动MM32F2570开发板深度评测
    android源码学习-android异常处理机制
  • 原文地址:https://blog.csdn.net/Caiabcd/article/details/126218675