MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费
topic partition replication ;log segment message-interval
| 配置 | 说明 | 默认值 |
|---|---|---|
num.partitions | 分区数量 | 1 |
offsets.topic.replication.factor | partition备份数量 | 1 |
log.dirs | 日志目录,xxx.log文件存储位置 | /tmp/kafka-logs |
log.segment.bytes | xxx.log文件大小 | 1073741824bytes=1GB |
log.index.interval.bytes | 往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目 减小这个值会增加索引条目 | 4096bytes=4KB |
MQ隔离了消息生产和消息消费,解耦了消息生产和消息消费
MQ作用:
MQ模式
p/s模式分类:
customer主动拉取优缺点:
kafka:基于发布订阅模式的消息中间件,主要应用于大数据实时处理领域
侧重点:数据的存储和读取,针对实时性比较高的流式数据处理场景
特点:
kafka与zookeeper

publish/subscribe模式
producer(生产者):
customer(消费者):
customer group(消费者组):
broker(中间人):
topic(话题):
partition(分区,消息队列):
event(消息)
replication(消息队列副本):
leader(消息队列领导者):
follower(消息队列跟随者):

producer customer customer-group topic-partition-replication
| 配置 | 说明 | 默认值 |
|---|---|---|
log.index.interval.bytes | 往xxx.log中新写入4KB+的数据xxx.index才会增加1个条目 减小这个值会增加索引条目 | 4096bytes=4KB |
log.segment.bytes | xxx.log文件大小 | 1073741824bytes=1GB |
num.partitions | 分区数量 | 1 |
offsets.topic.replication.factor | partition备份数量 | 1 |
log.dirs | 日志目录,xxx.log文件存储位置 | /tmp/kafka-logs |

说明:
log.dirs确定log文件存储目录,log.segment.bytes确定segment的大小,log.index.interval.bytes认为是log.segment.bytes 将生成新的segment(xxx.index, xxx.log, xxx.timeindex),文件名以文件内容第1个offset命名00000000000000000000.index
offset: 5077 position: 1067163526
offset: 5078 position: 1068068399
offset: 5079 position: 1068973272
00000000000000000000.timeindex
00000000000000000000.timeindex
timestamp: 1660291675000 offset: 5077
timestamp: 1660291675003 offset: 5078
timestamp: 1660291675008 offset: 5079
00000000000000000000.log
position: 1067163526
message
message
position: 1068068399
message
message
message
position: 1068973272
message

topic partition log segment .log .index .timeindex:
offset|position,xxx.timeindex存储timestamp|offsetxxx.index:
offset|下一个log.index.interval.bytes的起始物理位置,一段消息的offset(偏移量)xxx.log: position|message
xxx.timeindex:添加offset的毫秒timestamp|offset,什么时间加的offset
offset是什么
offset存储位置
kk0.9-版本offset保存在zk
kafka默认1s之后向zk提交数据
[controller brokers consumers]
- controller 的竞选策略是竞争,那个broker先在zk注册,谁就是controller
- brokers 所有的broker信息
- consumers 是consumerGroup
brokers/
[ids,topics,seqid]
- ids 中是所有broker的id
- topics 中是所有topic
- seqid 中是消息的序号 Exactly once
consumers/
[console-consumer-88500、console-consumer-70850]
- console-consumer-88502 是消费者组,没有指定consumerGroup的名字时console-consumer-随机5个数字
console-consumer-88500/
[ids,owners,offsets]
offsets/
[topicA]
- topicA是topic name
topicA/
[0,1]
- 0,1都是partition name
kk0.9+版本offset保存在kk(保存为topic:_consumer_offsets)
保存为系统内置的topic:_consumer_offsets
想消费系统内置topic,修改配置文件consuemr.properties,`exclude.internal.topics=false`
对于topic,_consumer_offsets,普通consumer是producer
创建系统consumer消费topic,_consumer_offsets,才能看到_consumer_offsets存储的offset的结构
普通customer消费topic和partition都不变,则生成的offset消息的key不变,提交到系统topic(_consumer_offsets)的partition不变
高效消息队列:
顺序写磁盘:数据原来是按簇离散存储到磁盘中的,kk是顺序写到磁盘中,减少了磁盘寻址时间(磁盘访问时间=寻道时间+寻找扇区时间+读字节时间)
传统IO:在内存的内核空间和用户空间来回拷贝

零拷贝:内核空间和用户空间都没有拷贝出现。实际上还是会拷贝一些元信息到socket cache

kk2.8.0- 必须要zookeeper
kafka进程broker和zookeeper结合构成集群
ZK作用:
controller(某个broker)作用
ISR中leader选举:
ISR(In-Sync Replicas):保持同步的副本,包括leader+n个follower,ISR中不包含所有follower
ISR作用:
follower的选择-kk老版本:
replica.lag.max.messagesreplica.lag.time.max.msfollower的选择-kk新版本:
replica.lag.time.max.ms问题:为什么kk新版本不要“消息差距标准了”
首先,两个条件都要将造成follower反复进出
replica.lag.max.messages=10replica.lag.time.max.ms规定的时间此外,leader和follower的通信时间标准变相保证了leader和follower消息的差距数量标准,leader和follower的通信时间延迟越短,消息同步速度自然越快,leader和follower的消息数量差距也就减少了。
LEO(Lag End Offset):ISR中每个replication(leader,follower)最后1个offset
HW(High Watermark):ISR中最小的LEO(最大offset中的最小值)
HW和LEO保证customer消费消息的一致性
producer没有收到ack的消息才重传
request.required.acks可取值为0/1/-1
request.required.acks=0:不会给producer响应ack
request.required.acks=1:leader消息落盘之后返回ack给producer
request.required.acks=-1:ISR中follower(不是所有follower)从leader同步消息落盘后发送ack给leader
request.required.acks=-1发送ack流程:
request.required.acks=0:不会给producer响应ack
request.required.acks=1:producer发送消息》leader接收消息并落盘后》leader所在broker宕机(数据丢失)》kafka重新选举leader》kafka消息丢失
request.required.acks=-1:当ISR中只有1个leader没有follower时,退回到ack=1的情况
offsets.topic.replication.factor=1副本数被设置为1,当ISR中只有1个leader没有follower,退回到ack=1的情况request.required.acks=-1:producer发送消息》leader接收消息并落盘后》ISR中follower从leader后同步消息并落盘后》leader宕机(发送ack失败)》producer没有收到ack》重新发送已经落盘的消息
At least Once:这个消息在kafka中至少有1个,1~n,对应acks=-1,消息不会丢失,消息可能会重复
At Most Once:这个消息在kafka中至多有1个,0~1,对应acks=0,消息可能丢失,消息不会重复
Exactly Once:这个消息在kafka中有且只有1个,1,消息幂等性+At Least Once=Exactly Once
kafka幂等性原理:唯一索引
kafka Exactly once过程:
1.将producer参数中enable.idempotence=true:producer会被分配1个PID,producer发送同1个partition的消息会附带SeqNumber
2.broker缓存,那个producer往那个partition发送了第几个event
跨partition:
,同1个消息发往不同的partition也无法去重,造成不满足Exactly once跨会话
,
已知Exactly once
,PID由broker提供给producerproducer事务:跨会话&&跨partition的Exactly once,两跨之下依然能够保证生产消息的唯一性
Transaction Coordinator组件:负责producer事务的实现
customer事务:保证1个消息只被消费1刺

producer:生产者使用send(ProducerRecord对象)方法发送构造的ProducerRecord对象
interceptors:拦截器
serializer:序列化器
partitioner:分区器
RecordAccumulator:记录累计器,是main线程和sender线程共享的对象
RecordBatch:批量记录的封装
sender:发送器,一次发送一个RecordBatch

将producer发送的数据封装成 ProducerRecord对象
ProducerInterceptor接口方法:
publish/subscribe模式分类:
customer主动拉取优缺点:
kafka customer:
注意:
customer具有以下分区分配策略:
Range(范围): 逐个topic
Round-Robin(轮询):面向订阅的所有topic
Sticky(粘性):
重置offset:
auto.offset.reset=latest/earliest, latest是默认方式, earliest等价于--from-begining, earliest不一定从0开始提交offset:
手动(同步/异步)提交:
消息漏消费:
消息重复消费:
offset存储位置
消息重复消费:
customer分配的partition经过rebalance:如果由于customer数量发生变化,造成其他customer分配的partition发生变化,即经过rebalance,此时其他customr需要存储更多的offset
监听rebalance:因此,需要1个ConsumerRebalanceListener监听consumer是否经过的rebalance,没有rebalance只存储原来分配的partition的offset,经过rebalance需要定位到新分配partition从故障customer最近提交的offset继续消费
下面2个方法需要自己实现
consumerGroup|topic|partition|offset消息默认持久化时间7天,kk中segment(.log, .index)默认保存7天
消息消费顺序,生产者顺序就是消费的顺序