partition中,消息是不会删除的,只是会追加写入,所以写入的消息就是顺序的。
这种特性决定定了kafka可以消费历史消息,而且是按照消息的顺序消费指定的消息,不是只能从队头消费信息。
如:消费组assign-group-1和ass5part(5个分区)的Partition的偏移量关系,可以使用命令查看:
./kafka-consumer-groups.sh --bootstrap-server 192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092
–describe --group assign-group-1
| PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID |
|---|---|---|---|---|
| 0 | 5 | 5 | 0 | consumer-1 |
| 1 | 5 | 5 | 0 | consumer-1 |
| 2 | 5 | 5 | 0 | consumer-1 |
| 3 | 5 | 5 | 0 | consumer-2 |
| 4 | 5 | 5 | 0 | consumer-2 |
消费者和tipic不是一一对应的,而是一个consumer group和Topic中的一个partition的关系(Offset再Partition中连续编号而不是全局连续编号)。
这个对应关系统一放在服务端维护,早期消费者组和partition的offset直接维护再zk中,但是读写性能消耗太大。
现在就放在一个特殊的topic中,铭记叫做 _consumer_offsets,默认又50个分区:offsets.topic.num.partitions = 50,每个分区默认是一个replication。
./kafka-topics.sh --topic __consumer_offsets --describe --bootstrap-server 192.168.8.146:9092
GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者有编号)
OffsetAndMetadata:保存了消费者组和各个partition的offset位移信息元数据。
如何知道一个consumer group的Offset会存放在哪个分区:
Math.abs(“gp-assign-group-1”.hashCode()) % 50
当Broker有记录Offset的情况下,如果说新增一个新的消费者组去消费一个Topic的某个Partition,没有Offset的记录,这个时候应该从哪里开始消费?
什么情况下找不到offset,就是新增加的GroupId,还没有开始消费。当新添加的消费组应该从哪里开始消费,有配置参数可以设置:
auto.offset.reset
消费组的Offset是保存在Broker的,但是由消费者上报给Broker的。并不是消费者消费了消息,Offset就会更新,消费者必须有一个Commit动作。
消费者可以自动提交或者手动提交,由消费端的这个参数控制:
enable.auto.commit
auto.commit.interval.ms
提交分两种,一种是手动同步提交,一种是手动异步提交。如果提交失败,Broker的Offset不会更新,也就是下次会出现重复消费的情况。
如果分区数量跟消费者的数量一样,那就一人消费一个。如果是消费者比分区多,或者消费者比分区少,这个时候消费者跟分区的关系?
默认策略:RangeAssignor
案例:
// 两个消费者消费5 个分区(同一个消费者组)
KafkaConsumer<String,String> consumer1=new KafkaConsumer<String, String>(props);
KafkaConsumer<String,String> consumer2=new KafkaConsumer<String, String>(props);
// 订阅队列
consumer1.subscribe(Arrays.asList("ass5part"));
consumer2.subscribe(Arrays.asList("ass5part"));
producer.send(new ProducerRecord<String,String>("ass5part",0,"0","0"));
producer.send(new ProducerRecord<String,String>("ass5part",1,"1","1"));
producer.send(new ProducerRecord<String,String>("ass5part",2,"2","2"));
producer.send(new ProducerRecord<String,String>("ass5part",3,"3","3"));
producer.send(new ProducerRecord<String,String>("ass5part",4,"4","4"));
----consume1----offset = 4 ,key =0, value= 0, partition= 0
----consume1----offset = 4 ,key =1, value= 1, partition= 1
----consume1----offset = 4 ,key =2, value= 2, partition= 2
----consume2----offset = 4 ,key =3, value= 3, partition= 3
----consume2----offset = 4 ,key =4, value= 4, partition= 4

也可以修改策略
props.put(“partition.assignment.strategy”,“org.apache.kafka.clients.consumer.Ro
undRobinAssignor”);
随机IO和顺序IO的读写速度差距巨大,kafka使用的顺序写
kafka提供了两个索引:offsetindex timestampindex
将消息变成一个批量文件,可以进行合理压缩减少IO
操作系统有两个内存空间:用户空间、内核空间


