• kafka知识点


    总结

    MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费

    topic partition replication ;log segment message-interval

    配置说明默认值
    num.partitions分区数量1
    offsets.topic.replication.factorpartition备份数量1
    log.dirs日志目录,xxx.log文件存储位置/tmp/kafka-logs
    log.segment.bytesxxx.log文件大小1073741824bytes=1GB
    log.index.interval.bytes往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目
    减小这个值会增加索引条目
    4096bytes=4KB

    MQ初识

    MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费
    MQ作用:

    • 解耦
    • 异步
    • 削峰,生产大于消费时,暂时存储消息
    • 可扩展

    MQ模式

    • publish/subscribe模式(发布/订阅):1个消息可被n个消费者消费
    • p2p模式:1个消息只被1个消费者消费

    p/s模式分类:

    • MQ主动推送,类似微信公众号
    • customer主动拉取,类似kakfa

    customer主动拉取优缺点:

    • 优点:MQ主动推送,不同的消费者消费能力不同
    • 缺点:customer需要轮询MQ中是否有新的消息

    kafka初识

    官网

    kafka:基于发布订阅模式的消息中间件,主要应用于大数据实时处理领域
    侧重点:数据的存储和读取,针对实时性比较高的流式数据处理场景

    特点:

    • 高吞吐:2ms,万亿级别消息/天
    • 高可用:集群内部节点动态扩展,集群之间动态扩展
    • 持久化:可持久化数据

    zookeeper初识

    kafka与zookeeper

    • 多个kk和1个zk可以构成1个集群
    • zk用来维护kk集群的元数据
    • kk0.9- offset存储在zk
    • kk0.9+ offset存储在kk(kk创建的topic中,topicName=_consumer_offsets)
    • offset(偏移量)记录了消费者消费的partition中消息位置(消费到第几个消息了)

    kafka/整体结构

    在这里插入图片描述
    publish/subscribe模式

    • kk只支持“发布/订阅”模式

    producer(生产者):

    • 消息的生产者

    customer(消费者):

    • 消息的消费者
    • 消费者实时记录offset,以便出错恢复时,从上次的位置继续消费
    • customer位于customer group中

    customer group(消费者组):

    • 把消费者分配到1个组里
    • 1个消费者也能构成1个组

    broker(中间人):

    • 消息管理的中间人,1个kk服务,1个kk进程
    • 多个broker组成kk集群

    topic(话题):

    • 1个话题可以有很多消息
    • producer:topic=m:n,customer:topic=m:n
    • topic中含有partition数目,replication数目
    • topic:partition=1:n

    partition(分区,消息队列):

    • 存放消息
    • partition数量可以>broker数量
    • partition以轮询的方式分配到broker中
    • n个partition分布在不同的broker中(负载均衡,高并发)
    • topic:partition=1:n
    • partition:replication=1:n
    • 1个partition中的1个消息只能被1消费者组中的1个消费者消费
    • partition:log文件=1:1

    event(消息)

    • event由key、value、timestamp组成
    • 相同key的event会传输到相同的partition
    • events被消费之后不会被删除,可以配置events保存时间(保存events不会影响kafka的性能)

    replication(消息队列副本):

    • 备份partition的消息
    • replication数量不能>broker数量
    • 只有1个replication的partition只有leader
    • leader和follower同1个partition的n个replication角色
    • leader-replication和follower-replication一定分布在不同的broker中
    • 每一个replication都维护了1个offset(消息偏移量,消费到那个消息了)

    leader(消息队列领导者):

    • 多个replication应该要有领导者
    • leader负责读写消息,producer的写入和customer的读取

    follower(消息队列跟随者):

    • 多个replication应该要有跟随者
    • follower只负责备份leader消息

    kafka/整体架构

    在这里插入图片描述
    producer customer customer-group topic-partition-replication

    • customer位于customer group中
    • 1个customer group可以只有1个customer
    • topicA的partition=3,1个partition有replication=2个副本,
    • partition一定位于不同broker中,partition数量不能>broker数量
    • replication-leader和replication-follower一定位于不同broker中,replication数量可以>broker数量
    • replication-leader负责消息读写,replication-follower负责备份replication-leader
    • topic和partition是逻辑概念,replication是物理概念负责消息的读写备份
    • kafka只支持publish/subscribe模式

    kafka/配置

    配置说明默认值
    log.index.interval.bytes往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目
    减小这个值会增加索引条目
    4096bytes=4KB
    log.segment.bytesxxx.log文件大小1073741824bytes=1GB
    num.partitions分区数量1
    offsets.topic.replication.factorpartition备份数量1
    log.dirs日志目录,xxx.log文件存储位置/tmp/kafka-logs

    kafka/稀疏索引 确定消息位置

    在这里插入图片描述
    说明:

    • 主要由xxx.index, xxx.log, xxx.timeindex文件
    • 文件名相同则属于同1个segment
    • 文件名以文件内容第1个offset命名
    • log.dirs确定log文件存储目录,log.segment.bytes确定segment的大小,log.index.interval.bytes认为是
      offset(偏移量)的单位
    • 当xxx.log文件>log.segment.bytes 将生成新的segment(xxx.index, xxx.log, xxx.timeindex),文件名以文件内容第1个offset命名
    • 确定消息位置:offset》xxx.index文件名》xxx.index内容(二分查找一段消息)》xxx.log内容(具体查找某个消息)

    00000000000000000000.index

    offset: 5077 position: 1067163526
    offset: 5078 position: 1068068399
    offset: 5079 position: 1068973272
    
    • 1
    • 2
    • 3

    00000000000000000000.timeindex

    00000000000000000000.timeindex
    timestamp: 1660291675000 offset: 5077
    timestamp: 1660291675003 offset: 5078
    timestamp: 1660291675008 offset: 5079
    
    • 1
    • 2
    • 3
    • 4

    00000000000000000000.log

    position: 1067163526 
    	message
    	message
    position: 1068068399 
    	message
    	message
    	message
    position: 1068973272 
    	message
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述
    topic partition log segment .log .index .timeindex:

    • topic:partition=1:n
    • partition:log=1:1
    • log:segment=1:n
    • log和segment是逻辑概念,.log, .index, .timeindex是物理文件
    • 1个segment有1个.log,1个.index文件和1个.timeindex文件
    • xxx.log存储消息,xxx.index存储offset|position,xxx.timeindex存储timestamp|offset
    • 同1个segment的xxx.log, xxx.index, xxx.timeindex的文件名相同,以文件内容第1个offset命名

    xxx.index:

    • offset|下一个log.index.interval.bytes的起始物理位置,一段消息的offset(偏移量)
    • 因为offset记录的是1段消息的偏移量,不是具体某个消息的偏移量,所以称作稀疏索引
    • 因为offset是递增有序的,所以可以使用二分查找,定位一段消息

    xxx.log: position|message

    xxx.timeindex:添加offset的毫秒timestamp|offset,什么时间加的offset

    kafka/offset

    offset是什么

    • offset是某个consumerGroup消费某个topic中的某个partition中的消息产生的
    • [consumerGroup|topic|partition] 唯一确定offset

    offset存储位置

    • kk0.9-版本保存在zk
    • kk0.9+版本 kafka

    kk0.9-版本offset保存在zk

    kafka默认1s之后向zk提交数据
    [controller brokers consumers]
    - controller 的竞选策略是竞争,那个broker先在zk注册,谁就是controller
    - brokers 所有的broker信息
    - consumers 是consumerGroup
    
    brokers/
    [ids,topics,seqid]
    - ids 中是所有broker的id
    - topics 中是所有topic
    - seqid 中是消息的序号 Exactly once 
    
    consumers/
    [console-consumer-88500、console-consumer-70850]
    - console-consumer-88502 是消费者组,没有指定consumerGroup的名字时console-consumer-随机5个数字
    
    console-consumer-88500/
    [ids,owners,offsets]
    
    offsets/
    [topicA]	
    - topicA是topic name
    
    topicA/
    [0,1]
    - 0,1都是partition name
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    kk0.9+版本offset保存在kk(保存为topic:_consumer_offsets)

    保存为系统内置的topic:_consumer_offsets
    想消费系统内置topic,修改配置文件consuemr.properties,`exclude.internal.topics=false`
    对于topic,_consumer_offsets,普通consumer是producer
    创建系统consumer消费topic,_consumer_offsets,才能看到_consumer_offsets存储的offset的结构
    普通customer消费topic和partition都不变,则生成的offset消息的key不变,提交到系统topic(_consumer_offsets)的partition不变
    
    • 1
    • 2
    • 3
    • 4
    • 5

    kafka/高效消息队列

    高效消息队列:

    • 多partition并行
    • 顺序写磁盘
    • 零拷贝

    顺序写磁盘:数据原来是按簇离散存储到磁盘中的,kk是顺序写到磁盘中,减少了磁盘寻址时间(磁盘访问时间=寻道时间+寻找扇区时间+读字节时间)

    传统IO:在内存的内核空间和用户空间来回拷贝
    在这里插入图片描述

    零拷贝:内核空间和用户空间都没有拷贝出现。实际上还是会拷贝一些元信息到socket cache
    在这里插入图片描述

    kafka/zookeeper

    kk2.8.0- 必须要zookeeper
    kafka进程broker和zookeeper结合构成集群

    ZK作用:

    • 选举某个broker为controller
    • 抢占式选举,那个broker先向zk注册这个broker就被当做leader

    controller(某个broker)作用

    • 管理集群中broker的上下线
    • 管理集群中topic的partition和replication分配
    • 管理集群中replication的leader选举

    ISR中leader选举:

    • 先获取ISR,再更新leader和ISR
    • ISR中leader挂了会通知到zk,isr_change_notification

    kafka/消息同步策略ISR(In-Sync Replicas)

    ISR(In-Sync Replicas):保持同步的副本,包括leader+n个follower,ISR中不包含所有follower
    ISR作用:

    • 新leader选举
    • follower的选择
    • ack的发送时机

    follower的选择-kk老版本:

    • leader和follower消息的差距数量,配置replica.lag.max.messages
    • leader和follower之间的通信时间,配置replica.lag.time.max.ms

    follower的选择-kk新版本:

    • leader和follower之间的通信时间,配置replica.lag.time.max.ms

    kafka/消息同步策略ISR(In-Sync Replicas)/问题

    问题:为什么kk新版本不要“消息差距标准了”
    首先,两个条件都要将造成follower反复进出

    • 如果replica.lag.max.messages=10
    • 生产者是批量发送12条消息》leader
    • 此时ISR中所有follower消息条数与leader都差距12条,ISR中所有follower被踢出
    • follower和leader之间的通信时间<replica.lag.time.max.ms规定的时间
    • ISR又把这些踢出去的follower全部加入
    • 上面两个条件放到造成反复选入和踢出follower,类似操作系统不好的页面置换算法造成页面抖动
    • 另外ISR的元数据需要同步给zk

    此外,leader和follower的通信时间标准变相保证了leader和follower消息的差距数量标准,leader和follower的通信时间延迟越短,消息同步速度自然越快,leader和follower的消息数量差距也就减少了。

    kafka/消息同步策略/HW和LEO customer

    LEO(Lag End Offset):ISR中每个replication(leader,follower)最后1个offset
    HW(High Watermark):ISR中最小的LEO(最大offset中的最小值)

    HW和LEO保证customer消费消息的一致性

    • HW之前的消息,customer才能够消费
    • ISR中选取的新leader中消息数量一定>=HW
    • ISR中不论那个follower成为leader后,消费者都能正常消费

    kafka/可靠性机制ACK producer

    producer没有收到ack的消息才重传
    request.required.acks可取值为0/1/-1
    request.required.acks=0:不会给producer响应ack
    request.required.acks=1:leader消息落盘之后返回ack给producer
    request.required.acks=-1:ISR中follower(不是所有follower)从leader同步消息落盘后发送ack给leader

    request.required.acks=-1发送ack流程:

    1. producer批量发送消息到kk
    2. 批量消息到达leader并落盘
    3. ISR中follower同步数据并落盘完成
    4. 发送ack给leader

    kafka/可靠性机制ACK/问题/消息丢失

    request.required.acks=0:不会给producer响应ack
    request.required.acks=1:producer发送消息》leader接收消息并落盘后》leader所在broker宕机(数据丢失)》kafka重新选举leader》kafka消息丢失
    request.required.acks=-1:当ISR中只有1个leader没有follower时,退回到ack=1的情况

    • offsets.topic.replication.factor=1副本数被设置为1,当ISR中只有1个leader没有follower,退回到ack=1的情况
    • 在ISR重新选入follower期间,ISR中只有1个leader提供服务,退回到ack=1的情况
    • 保证ISR中至少有1个follower就可以避免消息丢失

    kafka/可靠性机制acks/问题/消息重复

    request.required.acks=-1:producer发送消息》leader接收消息并落盘后》ISR中follower从leader后同步消息并落盘后》leader宕机(发送ack失败)》producer没有收到ack》重新发送已经落盘的消息

    kafka/消息数量/Exactly once(有且只有1个消息)

    At least Once:这个消息在kafka中至少有1个,1~n,对应acks=-1,消息不会丢失,消息可能会重复
    At Most Once:这个消息在kafka中至多有1个,0~1,对应acks=0,消息可能丢失,消息不会重复
    Exactly Once:这个消息在kafka中有且只有1个,1,消息幂等性+At Least Once=Exactly Once

    kafka幂等性原理:唯一索引

    kafka Exactly once过程:
    1.将producer参数中enable.idempotence=true:producer会被分配1个PID,producer发送同1个partition的消息会附带SeqNumber
    2.broker缓存,那个producer往那个partition发送了第几个event

    kafka/消息数量/Exactly once/问题

    跨partition:

    • kafka的幂等性无法保证跨分区跨会话的Exactly Once。因为,,同1个消息发往不同的partition也无法去重,造成不满足Exactly once

    跨会话

    • kafka重启:kafka重启PID会发生变化
    • producer重启:producer重启PID会发生变化

    producer/kafka/事务

    ,

    已知Exactly once

    • 可以保证不跨会话(kk重启或producer重启)&&不跨partition情况下的“消息有且只有1个”
    • 实现方式,PID由broker提供给producer

    producer事务:跨会话&&跨partition的Exactly once,两跨之下依然能够保证生产消息的唯一性

    • producer提供一个全局唯一的Transaction ID
    • 1对1绑定TID和PID,
    • 当跨会话(kk重启或producer重启)之后可以通过TID获取PID

    Transaction Coordinator组件:负责producer事务的实现

    • producer使用TID可以从Transaction Coordinator获取事务状态(事务topic保存事务状态)

    customer事务:保证1个消息只被消费1刺

    • 事务能力弱于producer
    • consumer可以通过offset访问kk中任意位置的的消息
    • kk中segment(.log, .index)默认保存7天,segment过期删除时仍有消息没有消费,被删除的消息不能被消费

    producer/整体结构

    在这里插入图片描述
    producer:生产者使用send(ProducerRecord对象)方法发送构造的ProducerRecord对象
    interceptors:拦截器
    serializer:序列化器
    partitioner:分区器

    RecordAccumulator:记录累计器,是main线程和sender线程共享的对象
    RecordBatch:批量记录的封装

    sender:发送器,一次发送一个RecordBatch

    producer/分区分配策略

    在这里插入图片描述
    将producer发送的数据封装成 ProducerRecord对象

    • 指定partition,使用指定partition的值
    • 没有指定partition但指定key,hash(key)%topic可用的partition数量=partition,key不变消息放置的partition不变
    • 没有指定partition且没有指定key,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与%topic可用的partition数量=partition
    • 三种封装成ProducerRecod对象的方法的优先级别,从上到下优先级降低

    producer/interceptor 自定义拦截器

    ProducerInterceptor接口方法:

    • configure(configs):操作配置信息
    • onSend(ProducerRecord):操作ProducerRecord。在Serializer、Partitioner之前调用
    • onAcknowledgement(RecordMetadata, Exception):ACK,broker发送确认给producer。producer回调之前调用
    • close():producer.close()时才会调用interceptor.close()
    • interceptor运行在producer所处线程中

    customer/消费方式

    publish/subscribe模式分类:

    • MQ主动推送,类似微信公众号
    • customer主动拉取,类似kakfa

    customer主动拉取优缺点:

    • 优点:MQ主动推送,不同的消费者消费能力不同
    • 缺点:customer需要轮询MQ中是否有新的消息

    kafka customer:

    • 消费时传入timeout(等待时间)参数,customer消费时没有数据,等待timeout时间再去查看。

    customer/分区分配策略

    注意:

    • 组为单位:以customer-group为单位订阅topic,同1个customer-group的customer不会重复消息
    • 同组互斥:1个topic下的1个partition中的1个event只能被customer-group中的1个customer消费

    customer具有以下分区分配策略:

    • Range(范围): kk默认分区分配策略,一段范围内的消息分配给某个消费者
    • Round-Robin(轮询):消息只分配到customer-group
    • Sticky(粘性):

    Range(范围): 逐个topic

    • 排序partition:排序同1个topic中的partition
    • 排序customer:排序同1个customer-group的customer
    • customer被分配的消息数量:range=partition数量(同1topic)/customer数量(同1customer-group),余数交给同1customer-group的第1个customer
    • 缺点-首customer负载不公:topic数量很多,大量topic的partition数量/customer数量=有余数时,这些余数将被交给同1customer-group的第1个customer(range+余数+余数+…+余数 数量的消息)

    Round-Robin(轮询):面向订阅的所有topic

    • 轮询订阅的所有topic的所有partition,将订阅的所有topic的所有partition结合起来轮询
    • 缺点,消息只分配到customer-group,不能指定由组内那个customer消费

    Sticky(粘性):

    • kk 0.11+之后才出现的新策略

    customer/重置offset

    重置offset:

    • customer更换customer-group之后将重置offset
    • segment过期删除(offset也被删除了)
    • auto.offset.reset=latest/earliest, latest是默认方式, earliest等价于--from-begining, earliest不一定从0开始
    • 批量消费提交offset,customer只获取offset一次,把这个offset放入内存中,后面的消费直接去修改内存中的offset,批量消费后提交offset

    customer/提交offset

    提交offset:

    • 自动定时提交
    • 手动(同步/异步)提交

    手动(同步/异步)提交:

    • 可以在消费者成功处理完数据后再提交,避免消费者侧消息丢失
    • 手动同步提交:使用1个线程poll消息和提交offset,提交offset失败会自动重试
    • 手动异步提交:异步手动提交,主线程仍然去拉取数据,另开1个线程去提交offset,提交offset失败不会重试
    • 手动同步提交缺点:线程提交offset时会阻塞直到提交offset完成
    • 手动异步提交缺点:提交offset失败不会重试

    customer/提交offset/自动定时提交/问题

    消息漏消费:

    • customer处理失败,但offset提交成功
    • 举例,1s钟提交1次offset最大值,当consumer处理最后1条消息时处理失败,此时刚好1s钟仍然会提交最大的offset,造成消息失败消费

    消息重复消费:

    • customer处理成功,但offset提交失败
    • 举例,10s提交1次offset最大值,当consumer5s内处理完poll下来的一批数据并入数据库后,刚好第6sconsumer故障 offset没有提交,consumer重新启动获取offset值不是最新的

    customer/存储offset

    offset存储位置

    • kk0.9-版本保存在zk
    • kk0.9+版本 kafka

    customer/存储offset/问题

    消息重复消费:

    • zk或kk的offset存储方式,当offset提交失败,造成customer重复消费(重新从旧offset开始消费)
    • 重复消费:consumer先处理消息,再提交offset,可能造成重复消费
    • 消息漏消费:先提交offset,再consumer处理消息,可能消费失败,进而造成消息漏消费
    • 因此采用自定义存储offset,把customer处理消息和提交offset弄成1个原子性操作

    customer/存储offset/自定义存储offset

    customer分配的partition经过rebalance:如果由于customer数量发生变化,造成其他customer分配的partition发生变化,即经过rebalance,此时其他customr需要存储更多的offset
    监听rebalance:因此,需要1个ConsumerRebalanceListener监听consumer是否经过的rebalance,没有rebalance只存储原来分配的partition的offset,经过rebalance需要定位到新分配partition从故障customer最近提交的offset继续消费

    下面2个方法需要自己实现

    • 获取某partition最新的offset(比如,从mysql获取)
    • 提交该customer被分配的partition最新的offset(比如,提交到mysql)
    • mysql表设计同于kkconsumerGroup|topic|partition|offset

    问题

    消息默认持久化时间7天,kk中segment(.log, .index)默认保存7天
    消息消费顺序,生产者顺序就是消费的顺序

  • 相关阅读:
    仅个人记录:复现dotspatialdemo、打包、
    MySQL锁
    Seata-TCC模式
    JVM虚拟机:Java对象的头信息有什么?
    web3 从redux中拿出所有已完成订单 并渲染到对应的Table列表中
    【ASP.NET Core】MVC模型绑定:非规范正文内容的处理
    Linux 常用命令
    c++八股day2-虚函数表和虚函数表指针的创建时机
    java进阶编程思想(七天)
    kafka的安装和基本操作
  • 原文地址:https://blog.csdn.net/baidu_35805755/article/details/126290634