• kafka Consumer分析与速度特性


    1. kafka消费者原理

    1.1 Offset的维护

    1.1.1 Offset存储

    partition中,消息是不会删除的,只是会追加写入,所以写入的消息就是顺序的。
    这种特性决定定了kafka可以消费历史消息,而且是按照消息的顺序消费指定的消息,不是只能从队头消费信息。
    如:消费组assign-group-1和ass5part(5个分区)的Partition的偏移量关系,可以使用命令查看:

    ./kafka-consumer-groups.sh --bootstrap-server 192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092
    –describe --group assign-group-1

    PARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-ID
    0550consumer-1
    1550consumer-1
    2550consumer-1
    3550consumer-2
    4550consumer-2
    • CURRENT-OFFSET指的是下一个未使用的Offset
    • LOG-END-OFFSET:下一条等待写入消息的Offset
    • LAG是延迟量

    消费者和tipic不是一一对应的,而是一个consumer group和Topic中的一个partition的关系(Offset再Partition中连续编号而不是全局连续编号)。

    这个对应关系统一放在服务端维护,早期消费者组和partition的offset直接维护再zk中,但是读写性能消耗太大。
    现在就放在一个特殊的topic中,铭记叫做 _consumer_offsets,默认又50个分区:offsets.topic.num.partitions = 50,每个分区默认是一个replication。

    ./kafka-topics.sh --topic __consumer_offsets --describe --bootstrap-server 192.168.8.146:9092

    • 这些分区副本均匀分布在所有的Broker上
      topic里面是可以存放对象类型的value,主要存储两种对象:

    GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者有编号)
    OffsetAndMetadata:保存了消费者组和各个partition的offset位移信息元数据。

    如何知道一个consumer group的Offset会存放在哪个分区:

    Math.abs(“gp-assign-group-1”.hashCode()) % 50

    1.1.2 Group Offset无记录

    当Broker有记录Offset的情况下,如果说新增一个新的消费者组去消费一个Topic的某个Partition,没有Offset的记录,这个时候应该从哪里开始消费?
    什么情况下找不到offset,就是新增加的GroupId,还没有开始消费。当新添加的消费组应该从哪里开始消费,有配置参数可以设置:

    auto.offset.reset

    • latest,默认参数,也就是从最新的消息开始消费,历史消息是不能消费的
    • earliest,从最早发送的消息开始消费,可以消费到历史消息
    • none,如果Consumer Group在服务端找不到offset会报错

    1.1.3 Offset的更新

    消费组的Offset是保存在Broker的,但是由消费者上报给Broker的。并不是消费者消费了消息,Offset就会更新,消费者必须有一个Commit动作。
    消费者可以自动提交或者手动提交,由消费端的这个参数控制:

    enable.auto.commit

    • 默认是true,代表着消费以后自动提交

    auto.commit.interval.ms

    • 默认是5000,即5s

    提交分两种,一种是手动同步提交,一种是手动异步提交。如果提交失败,Broker的Offset不会更新,也就是下次会出现重复消费的情况。

    1.2 消费者消费策略(消费者与分区关系)

    1.2.1 消费策略

    如果分区数量跟消费者的数量一样,那就一人消费一个。如果是消费者比分区多,或者消费者比分区少,这个时候消费者跟分区的关系?

    默认策略:RangeAssignor
    案例:

    • 创建一个5个分区的Topic:
      ./kafka-topics.sh --create --bootstrap-server 172.0.0.1:9092 --partitions 5 --replication-factor 1 --topic ass5part
    • 启动两个消费者消费(同一个消费者组,目标是消费同一个Partition,不同的client id):
    // 两个消费者消费5 个分区(同一个消费者组)
    KafkaConsumer<String,String> consumer1=new KafkaConsumer<String, String>(props);
    KafkaConsumer<String,String> consumer2=new KafkaConsumer<String, String>(props);
    // 订阅队列
    consumer1.subscribe(Arrays.asList("ass5part"));
    consumer2.subscribe(Arrays.asList("ass5part"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 生产者
    producer.send(new ProducerRecord<String,String>("ass5part",0,"0","0"));
    producer.send(new ProducerRecord<String,String>("ass5part",1,"1","1"));
    producer.send(new ProducerRecord<String,String>("ass5part",2,"2","2"));
    producer.send(new ProducerRecord<String,String>("ass5part",3,"3","3"));
    producer.send(new ProducerRecord<String,String>("ass5part",4,"4","4"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 打印结果

    ----consume1----offset = 4 ,key =0, value= 0, partition= 0
    ----consume1----offset = 4 ,key =1, value= 1, partition= 1
    ----consume1----offset = 4 ,key =2, value= 2, partition= 2
    ----consume2----offset = 4 ,key =3, value= 3, partition= 3
    ----consume2----offset = 4 ,key =4, value= 4, partition= 4

    消费之分区使用
    也可以修改策略

    props.put(“partition.assignment.strategy”,“org.apache.kafka.clients.consumer.Ro
    undRobinAssignor”);

    2 kafka为什么快

    2.1 顺序写

    随机IO和顺序IO的读写速度差距巨大,kafka使用的顺序写

    2.2 索引

    kafka提供了两个索引:offsetindex timestampindex

    2.3 批量读写和文件压缩

    将消息变成一个批量文件,可以进行合理压缩减少IO

    2.4 零拷贝

    操作系统有两个内存空间:用户空间、内核空间

    • 用户空间:提供用户调用的接口,但是很多涉及到内核的命令是不能直接发出指令
    • 内核空间:可以执行任意命令,调用系统的一切资源,用户空间必须要通过一些系统接口才能向内核发出指令。
    1. read模型
      如果用户从磁盘读取数据,必须要想把数据拷贝到内核缓冲区,然后从内核缓存区到用户缓存区到用户缓冲区,最后才能返回给用户。
      在这里插入图片描述
    2. DMA拷贝
      DMA(Direct Memory Access),可以理解为cpu找到一个代理人,将数据传输任务代理给DMA控制器,解放cpu。
      io模型
      需要做4次用户态和内核太的切换,4次数据拷贝
      1. 把数据从磁盘拷贝到内存缓冲区
      2. 将数据拷贝到用户缓冲区
      3. 把数据拷贝到socket缓冲区
      4. 把数据拷贝到网卡设备
    3. 零拷贝
      linux提供了一个sendfile函数,可实现 ”零拷贝“。之歌时候就不需要经过用户态,直接把数据拷贝到网卡设备,
      因为这个只有DMA拷贝,没有CPU拷贝,所以叫做”零拷贝“。至少可以提高一倍的性能。
      在这里插入图片描述

    3 kafka消息不丢失配置

    3.1 Producer端配置

    1. Producer端使用producer.send(msg, callback)带有回调的send 方法,而不是producer.send(msg)方法。根据回调,一旦出现消息提交失败的情况,就可以有针对性地进行处理。
    2. 设置acks=all,acks是Producer的一个参数,代表”已提交“消息的定义,如果设置成all,表示所有Broker都收到消息,改消息才算是”已提交“。
    3. 设置retries为一个较大值。当网络抖动是可能出现发送失败,自动重试发送消息,尽量避免消息丢失。

    3.2 Broker端

    1. 设置unclean.leader.election.enable = false,在版本迭代中,反复修改过它的默认值,比较有争议。它控制哪些broker有资格竞选分区Leader。雨果一个Broker落后原先Leader太多,那么它一旦成为新的Leader,将导致消息丢失,所以一般都要将改参数设置为false
    2. 设置replication.factory >=3 需要三个以上的副本。
    3. 设置min.insync.replicas > 1。控制消息只好要被写入到多个副本才能算是也提交。设置成>1可以提升消息持久性。才生产环境一般要确保replication.factor > min.insync.replicas。如果两者相等,那么有一个副本离线,整个分区就无法正常工作了,推荐设置replication.factor =
      min.insync.replicas + 1

    3.3 Consumer端

    1. enable.auto.commit=false,确保消息消费完成在提交,并自己来处理offset的提交更新。
  • 相关阅读:
    黑马程序员spring+springMVC+Maven高级+springboot+MyBatisPlus总结之加载配置文件和容器
    物联网智能油井控制系统
    (附源码)spring boot动力电池数据管理系统 毕业设计 301559
    html 列表标签的学习,表单标签的学习
    【论文导读】 - 关于联邦图神经网络的3篇文章
    如何通过C#/VB.NET 代码调整PDF文档的页边距
    【Linux成长史】Linux基本指令大全
    Unity 控制物体透明度变化
    Android项目---拼图小游戏(下)
    鸿蒙全量源代码在线阅读
  • 原文地址:https://blog.csdn.net/weixin_43704834/article/details/126351876