• 消费者偏移量_consumer_offsets相关解析


    1.概述

    __consumer_offsetskafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

    __consumer_offsets 的每条消息格式大致如图所示:

    在这里插入图片描述

    可以想象成一个 KV 格式的消息,key 就是一个三元组group.id+topic+分区号,而 value 就是 offset 的值。

    考虑到一个 kafka 集群中可能有很多consumerconsumer group,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions, 从而将负载分散到不同的 _consumer_offsets分区上。

    一般情况下,当集群中第一次有消费者消费消息时会自动创建__consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50。

    2.消费者消费topic

    消费者组:hy-group topic:hy1-test-topic

    bin/kafka-console-consumer.sh --bootstrap-server  hadoop102:9092,hadoop103:9092,hadoop104:9092 --group hy-group --topic hy1-test-topic
    
    • 1

    3.生产者生产消息

    bin/kafka-console-producer.sh --broker-list  hadoop102:9092,hadoop103:9092,hadoop104:9092   --topic hy1-test-topic
    
    • 1

    在这里插入图片描述

    4.查看指定消费组的消费位置offset

    bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --describe --group hy-group
    
    • 1

    在这里插入图片描述

    从上图中可以看出:

    • 每个partition 对应的消费者id; 因为只开了一个消费者; 该消费者同时消费3个partition;
    • CURRENT-OFFSET: 当前消费组消费到的偏移量
    • LOG-END-OFFSET: 日志最后的偏移量
    • CURRENT-OFFSET = LOG-END-OFFSET 说明当前消费组已经全部消费了;

    此时关闭消费者之后, 再发送几条消息看看:

    partition-0 partition-1 partition-2LOG-END-OFFSET: 日志最后的偏移量分别增加了1; 但是CURRENT-OFFSET: 当前消费组消费到的偏移量 保持不变;因为没有被消费。

    重新打开一个消费者组,继续消费,此时偏移量也更新啦。
    在这里插入图片描述

    在这里插入图片描述

    5.从头开始消费 --from-beginning

    如果用新的消费组去消费一个Topic,那么默认该消费组的offset会是最新的; 即历史的不会消费。

    开启新的消费者组

    bin/kafka-console-consumer.sh --bootstrap-server   hadoop102:9092,hadoop103:9092,hadoop104:9092 --group hy-group1  --topic hy1-test-topic
    
    • 1

    查看消费情况

    bin/kafka-consumer-groups.sh --bootstrap-server  hadoop102:9092,hadoop103:9092,hadoop104:9092  --describe --group hy-group1
    
    • 1

    在这里插入图片描述

    可以看到CURRENT-OFFSET = LOG-END-OFFSET 。 如何让新的消费组/者 从头开始消费呢? 加上参数 --from-beginning

    # 从头开始消费
    bin/kafka-console-consumer.sh --bootstrap-server   hadoop102:9092,hadoop103:9092,hadoop104:9092 --group hy-group1  --topic hy1-test-topic --from-beginning
    
    • 1
    • 2

    6.如何确认 consume_group 在哪个__consumer_offsets-?

    Math.abs(groupID.hashCode()) % numPartitions

  • 相关阅读:
    lwIP 开发指南(中)
    大模型赛道如何实现华丽的弯道超车【赠书活动|第十期《分布式统一大数据虚拟文件系统 Alluxio原理、技术与实践》】
    架构扩展性
    KY191 矩阵幂(用Java实现)
    【Rust日报】2023-11-08 RustyVault -- 基于 rust 的现代秘密管理系统
    代码随想录Day61 | 503. 下一个更大元素 II | 42. 接雨水
    软件测试要学会哪些东西才能拿2w+的工资?
    寒假训练——第一周(STL)
    5个最好的乐高设计软件
    如何正确使用美容连锁店收银系统的众多功能?
  • 原文地址:https://blog.csdn.net/weixin_44852067/article/details/133351238