• kafka 消息偏移量


    [oswatch@yyjk tmp]$ cat consumerkafka_mario.py
    #!/usr/bin/env python
    # coding=utf-8
    from  kafka import KafkaConsumer
    from  kafka import TopicPartition
    import time
    def get_kafka_reviews(bootstrap_servers,topics):
        # print type(self.bootstrap_servers)
        consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers],auto_offset_reset='latest', enable_auto_commit=False)
        consumer.subscribe(topics=(topics))  #订阅要消费的主题
        print consumer.topics()
        print "111111",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=0)) #获取当前主题的最新偏移量
        print "222222",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=1)) #获取当前主题的最新偏移量
        print "333333",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=2)) #获取当前主题的最新偏移量
        time.sleep(30)

        review_list =[]
        for message in consumer:
            print message
            #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
            review_list.append(message.value)
        return  review_list

    print get_kafka_reviews('1.1.1.27:9092','NewProxyBaseData')
    [oswatch@yyjk tmp]$ python consumerkafka_mario.py
    set([u'NewMongoErrorCount', u'PROXY7GeneratorIndex', u'IndexTimeOut', u'MARIOREGISTER', u'BASERECORD', u'ProxyTimeOut', u'NewProxyTimeOut', u'ProxyGatherCompleted', u'NewProxyIndexPrepare', u'mutableAlertInfo', u'NewProxyGather', u'PROXY6', u'PROXY7', u'PROXY4', u'PROXY6GeneratorIndex', u'PROXY2', u'NewProxyNoBaseData', u'PROXY0', u'PROXY1', u'PROXY1GeneratorIndex', u'ProxyBaseData', u'NewMongoReInsert', u'PROXY8', u'PROXY9', u'PROXY10GeneratorIndex', u'ProxyAlert', u'ProxyGather', u'IndexOnTime', u'EXALERTSETINFO', u'RANGEDATA', u'PROXY9GeneratorIndex', u'PROXY100', u'PROXY100GeneratorIndex', u'NewAlertCalStandBy', u'index', u'MessageStandbyBank', u'PROXY5GeneratorIndex', u'BaseRecord', u'alert', u'PROXY10', u'PROXY5', u'databus', u'NewProxyIndexDetail', u'datasend', u'alerttest', u'PROXY2GeneratorIndex', u'PUSHALERTSETSEND', u'indextest', u'PROXY3GeneratorIndex', u'SubIndexTimeOut', u'NewProxyGatherCompleted', u'LUFAX', u'DATAFLOW', u'PROXY3', u'register', u'NewMongoReUpsert', u'MarioAgent', u'MessageStandby', u'PROXY8GeneratorIndex', u'IndexGatherTimeOut', u'GatherIndexTimeOut', u'ALERTINFO', u'NewProxyBaseData', u'luohantest', u'PROXY4GeneratorIndex', u'ProxyIndexDetail', u'registerbank', u'NewProxyGatherNoCore'])
    111111 584672460
    222222 574728466
    333333 581566016

  • 相关阅读:
    Android P 性能优化:创建APP进程白名单,杀死白名单之外的进程
    【深度学习】——深度学习中的梯度计算
    python-线程池的使用
    Java线程池中的基础问题
    基于新版OpenCV5(C++)框架的DNN实现yolov3、4、5、6、7、x模型部署推理
    document.getElementByclassName()方法
    Java_Math类_Random类
    .net6 WebApi使用工厂模式+IOC
    2023年Q3季度国内手机大盘销额下滑2%,TOP品牌销售数据分析
    加密市场进入寒冬,是“天灾”还是“人祸”?
  • 原文地址:https://blog.csdn.net/zhaoyangjian724/article/details/126362392