• kafka


    消息队列流派

    有Broker的MQ

    • 这个流派通常有一台服务器作为Broker,所有消息都通过它中专。生产者把消息发送给他就结束自己的任务了,由Broker把消息推送给消费者;kafka和rabbitmq(rabbitmq中的交换机就是broker)都属于这个流派。RocketMQ的是基于kafka的原理制作的,功能比kafka多
    • kafka属于重topic的消息队列,整个broker需要依据topic进行消息中转,必然有topic的存在
    • rabbitmq属于轻topic的消息队列,topic中转只属于其中一种中转模式,即主题模式

    无Broker的MQ

    • 无Broker的MQ代表是zeroMQ,这里MQ是更高级的Socket,所以zeroMQ被设计成了一个库,而不是中间件,这种实现可以达到没有Broker的目的
    • 每个节点本身既是生产者又是消费者,独自维护自身的队列。ZeroMQ做的就是封装一套类似Socket的API来完成发送数据读取数据

    kafka基础

    kafka是一个分布式、支持分区的(partition)、多副本的(replica)、基于zookeeper的协调式的分布式消息系统,最大的特性就是可以实时处理大量数据来满足各种需求场景:比如hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎、web/nginx日志、访问日志、消息服务等

    使用场景

    • 日志收集:收集系统的各种log
    • 消息系统:解耦和生产消费模式、缓存消息等
    • 用户活动跟踪:记录用户的各种活动,如浏览网页、搜索、点击等,这些活动被各个服务器发布到kafka的topic中。然后订阅者通过订阅这些topic来做实时的监控分析
    • 运营指标:kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据

    基本概念

    名词解释
    Broker消息中间件处理结点,一个Kafka结点就是一个Broker,一个或多个Broker可以组成一个Kafka集群
    TopicKafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
    Producer消息生产者,像Broker发送消息的客户端
    Consumer消息消费者,从Broker读取消息的客户端
    ConsumerGroup每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group只能有一个Consumer能够消费该消息
    Partition物理上的概念,一个topic可以分成多个partition,每个partition内部的消息是有序的

    Kafka基本使用

    • 通过kafka命令像zk中创建一个topic
    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    • 1
    • 查看zk中的所有topic
    ./kafka-topics.sh --list --zookeeper localhost:2181
    
    • 1
    • 打开发送消息客户端,可以从本地文件读取内容,或者从命令行直接输入,这些内容会被发送到broker中
    ./kafka-console-producer.sh --broker-list 192.168.164.1:9092 --topic test
    
    • 1
    • 打开消费消息客户端,会获取到内容到命令行输出,默认是消费最新的消息
    # 从最后一条消息的偏移量+1开始消费,即消费端连接之前已存的消息不会被这个消费端消费
    ./kafka-console-consumer.sh --bootstrap-server 192.168.164.1:9092 --topic test
    
    # 从头开始消费
    ./kaka-console-consumer.sh --bootstrap-server 192.168.164.1:9092 --from-beginning --topic test
    
    • 1
    • 2
    • 3
    • 4
    • 5

    关于消息的细节

    1. 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中,每一个从头开始消费的消费端都可以获得这些消息,这一点和rabbitmq不同,rabbitmq中的消息经过交换机转发到队列中,每个消费短链接到队列后会将队列中的消息移除
    2. 消息的保存是有序的,通过offset偏移量来描述消息的有序性
    3. 消费者消费消息时也是通过offset来描述当前需要消费的消息的位置

    消费组

    单播消息
    如果多个消费者在一个消费组中,那么只有一个消费者可以收到订阅的topic中的消息,换言之,同一个消费组只能有一个消费者收到一个topic中的消息,在创建消费客户端时通过指定消费组的id来控制消费端属于哪个消费组

    ./kafka-console-consumer.sh--bootstrap-server 192.168.164.1:9092 --consumer-property group.id=testGroup --topic test
    
    • 1

    多播消息
    不同的消费者订阅同一个topic,如果这些消费者在不同的消费组中,这些消费者可以同时收到这些消息

    ./kafka-console-consumer.sh--bootstrap-server 192.168.164.1:9092 --consumer-property  group.id=testGroup1 --topic test
    ./kafka-console-consumer.sh--bootstrap-server 192.168.164.1:9092 --consumer-property  group.id=testGroup2 --topic test
    
    • 1
    • 2

    查看消费组的相关信息

    #查看当前topic下有哪些消费组
    ./kafka-consumer-groups.sh --bootstrap-server 192.168.164.1:9092 --list
    #查看消费组中的具体信息,比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
    ./kafka-consumer-groups.sh --bootstrap-server 192.168.164.1:9092 --describe --group testGroup
    
    • 1
    • 2
    • 3
    • 4
    • Current-offset:当前消费组的以消费偏移量
    • Log-end-offset:topic对应分区消息的结束偏移量
    • Lag:当前消费组未消费的消息数

    topic和partition

    主题Topic

    • 主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类,不同的topic会被订阅该topic的消费者消费
    • topic存在一个问题,即当消息非常多,需要大量空间来存储,因为消息是会被保存到log文件中的,为了防止这个log文件过大,kafka提出了分区的概念

    分区partition

    • 一个主题中的消息量非常大,因此可以通过分区的设置,来分布式存储这些消息。比如一个topic创建了3个分区,那么topic中的消息就会分别存放在这三个分区中
    • 分区存储可以解决统一存储时文件过大的问题
    • 提供了读写的吞吐量,读和写可以同时在多个分区中进行

    创建多分区的topic:

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions  2 --topic test1
    
    • 1

    kafka中消息日志文件保存的内容

    1. 00000.log:保存的就是消息本身
    2. _consumer_offsets-49:kafka内部创建了_consumer_offsets主题包含了50个分区,这个出题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护消费的主题的偏移量,即每个消费者会把消费的主题偏移量自主上报给kafka中的_consumer_offsets。因此kafka为了提升这个topic的并发性,默认设置了50个分区
      • 提交到哪个分区:通过hash函数hash(consumerGroupId)%_consumer_offsets分区数来求得
      • 提交到topic中的内容:key为consumerGroupId+topic+分区号,value就是当前offset的值
    3. 文件中保存的消息,默认是保存7天,7天到后消息会被删除

    分区消费的几个细节

    • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证分区的局部有序,但是多个partition的多个消费者的消费顺序得不到保证
    • partition的数量决定了消费组中的消费者数量,建议一个消费组中的消费者数量不要超过partition的数量,否则多出的消费者消费不到消息
    • 如果消费者挂了,会触发rebalance机制,让其他消费者来消费该分区

    kafka集群

    集群的搭建

    准备三个server.properties文件:

    • server0.properties
    broker.id=0
    listeners=PLAINNEXT://192.168.164.1:9092
    log.dir=/usr/local/data/kafka-logs-0
    
    • 1
    • 2
    • 3
    • server1.properties
    broker.id=1
    listeners=PLAINNEXT://192.168.164.1:9093
    log.dir=/usr/local/data/kafka-logs-1
    
    • 1
    • 2
    • 3
    • server2.properties
    broker.id=2
    listeners=PLAINNEXT://192.168.164.1:9094
    log.dir=/usr/local/data/kafka-logs-2
    
    • 1
    • 2
    • 3

    通过命令启动三台broker

    ./kafka-server-start.sh -daemon ../config/server0.properties
    ./kafka-server-start.sh -daemon ../config/server1.properties
    ./kafka-server-start.sh -daemon ../config/server2.properties
    
    • 1
    • 2
    • 3

    副本的概念

    副本是对分区的备份,在集群中,不同的副本会被部署在不同的broker上,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他都是follower

    • leader:kafka的写和读操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follwer中选举产生一个新的leader
    • follower:接收leader的同步数据
    • isr:可以同步和已同步的结点都会被存入到isr集合中,如果isr中的结点性能较差,会被踢出isr集合

    集群中有多个broker,创建topic时可以指明topic有多少个partition,消息会被拆分到不同的partition中存储,可以为partition创建多个副本,不同的副本存放在不同的broker里,某一个broker宕机,其他broker还可以保证正常的运行

    Java客户端操作Kafka

    生产者

    /***
     * @author shaofan
     * @Description kafka生产者
     */
    public class MyProducer {
        private final static String TOPIC_NAME="java-test";
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Properties props = new Properties();
            // 配置kafka集群的地址端口
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.164.1:9092,192.168.164.1:9093,192.168.164.1:9094");
            // 配置键序列化方式,将发送的key序列化为字节数组
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // 配置值序列化方式,将发送的value序列化为字节数组
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            // 配置生产者的消息确认机制
            props.put(ProducerConfig.ACKS_CONFIG,1);
            // 配置发送消息缓冲区大小
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
            // 配置kafka单次从缓冲区拉去的数据大小
            props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
            // 当缓冲区剩余数据不足单词拉取的数据时,经过该时间ms会将剩余数据都拉到kafka
            props.put(ProducerConfig.LINGER_MS_CONFIG,10);
    
            // 根据配置创建kafka生产者对象
            Producer<String,String> producer = new KafkaProducer<>(props);
    
            // 需要发送的消息记录,指定需要发送的目标topic和消息键值,这里没有指定分区,会根据hash计算分区,也可以在第二个参数传入指定的分区号
            ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"1","hello");
    
            try{
                // 同步发送消息并获取元数据,在消息发送后会进入阻塞,等带消息结果
                RecordMetadata metadata = producer.send(producerRecord).get();
                if(metadata!=null){
                    System.out.println("同步发送消息结果:topic-"+metadata.topic()+"|partition-"+metadata.partition()+"|offset-"+metadata.offset());
                }
            }catch(Exception e){
                System.out.println("消息发送失败");
            }
    
    
            // 异步发送消息,消息发送后不会进入阻塞,在异步回调中获取消息结果
            producer.send(producerRecord, (metadata1, exception) -> {
                if(metadata1!=null){
                    System.out.println("异步发送消息结果:topic-"+ metadata1.topic()+"|partition-"+ metadata1.partition()+"|offset-"+ metadata1.offset());
                }
                if(exception!=null){
                    exception.printStackTrace();
                }
            });
            
            Thread.sleep(2000);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    ack相关配置

    • ack配置为0,此时kafka-cluster不需要任何broker收到消息,就会立即返回ack给生产者,最容易丢消息,效率也最高
    • ack配置为1,集群中的leader已经收到消息,并把消息写入到了log中,才会将ack返回给生产者,性能和安全性均衡
    • ack配置为-1/all,有默认的配置min.insync.replicas=2(默认为1,推荐大于等于2),此时就需要一个leader和一个follower同步完成之后才会返回ack给生产者,最安全,但性能最差

    消费者

    /***
     * @author shaofan
     * @Description kafka消费者
     */
    public class MyConsumer {
        private final static String TOPIC_NAME="java-test";
        private final static String CONSUMER_GROUP_NAME="testGroup";
    
        public static void main(String[] args) {
            Properties prop = new Properties();
            // 配置kafka地址
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.164.1:9092,192.168.164.1:9093,192.168.164.1:9094");
            // 配置消费组名
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
            // 配置反序列化
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            // 配置长轮询poll最大消息数
            prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
            // 消费者发送心跳的间隔
            prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
            // 如果超过这个时间kafka都没有接收到消费者的心跳,则会将这个消费者踢出消费组,进行rebalance,将分区分配给其他消费者
            prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);
            // 如果两次消费的时间超出了这个时间间隔,则kafka认为这个消费者的性能较低,将它踢出消费组,进行rebalance
            prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);
    
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
            // 订阅topic
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            // 循环接收消息
            while(true){
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息:partition=%d|offset=%d|key=%s|value=%s",record.partition(),record.offset(),record.key(),record.value());
                }
    
                if(!records.isEmpty()){
                    // 手动同步提交offset
                    //consumer.commitSync();
    
                    // 手动异步提交offset,当前线程不会阻塞
                    consumer.commitAsync((offsets, exception) -> {
                        if(exception!=null){
                            exception.printStackTrace();
                        }
                    });
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    offset自动提交和手动提交

    • 自动提交:当消费者从broker拉取一条消息时,会立即提交当前消息的offset到kafka的_consumer_offsets中,如果消费没有完成就提交了offset,那么会导致消息丢失
    • 手动提交:在消费消息时或消息消费后,再提交offset,通过cosumer.commitSync同步提交或consumer.Async异步提交

    长轮询poll消息

    • 配置ConsumerConfig.MAX_POLL_RECORDS_CONFIG后,如果轮询到指定数量的消息,则开始执行
    • 如果没有poll到指定数量的消息,则继续轮询
    • 在poll时如果指定了时间,即使没有poll到指定数量的消息,时间到了也会停止轮询

    新消费组的消费offset规则
    新消费组中的消费者在启动后,默认从当前分区的最后一条消息的offset+1开始消费(消费新消息),可以通过配置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG为earliest第一次从头开始消费(默认是Lastest,即消费新消息)

    Springboot整合Kafka

    配置文件

    server:
      port: 8080
    
    spring:
      kafka:
        bootstrap-servers: 192.168.164.1:9092,192.168.164.1:9093,192.168.164.1:9094
        producer:
          # 重发次数,如果发送消息没有收到ack则会进行重试
          retries: 3
          # kafka单次从缓冲区读取的数据大小
          batch-size: 16384
          # 缓冲区大小
          buffer-memory: 33554432
          # ack模式,1表示需要leader收到消息
          acks: 1
          # 键值的序列化类
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          # 消费组id
          group-id: default-group
          # 是否开启自动提交
          enable-auto-commit: false
          # 首次连接offset规则,earliest表示从第一条消息开始消费
          auto-offset-reset: earliest
          # 键值的序列化类
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 一次轮询最多拉取的消息数量
          max-poll-records: 500
        listener:
          # 消费者手动提交模式
          # record 当记录被消费者监听器处理后提交
          # batch 当一批poll的数据被消费者监听器处理后提交
          # time 当一批poll的数据被消费者监听器处理后,距离上次提交时间大于ack-time时提交
          # count 当一批poll的数据被消费者监听器处理后,被处理的消息数大于ack-count时提交
          # count_time count或time满足一个即提交
          # manual 当一批poll的数据被消费者监听器处理后,手动调用Acknowledgement.acknowledge()时提交
          # manual_immediate 只要调用Acknowledgement.acknowledge()即提交,一般使用这种
          ack-mode: manual_immediate
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    生产者

    @RestController
    @RequestMapping("msg")
    public class KafkaController {
        private final static String TOPIC_NAME="java-test";
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        @PostMapping("send")
        public String sendMessage(String msg){
            kafkaTemplate.send(TOPIC_NAME,0,"key",msg);
            return "success";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者

    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = "java-test",groupId = "MyGroup1")
        public void group1(ConsumerRecord<String,String> record, Acknowledgment acknowledgment){
            System.out.println(record.value());
            // 提交offset
            acknowledgment.acknowledge();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    消费者相关配置

    在@KafkaListener中可以通过topicPartitions属性指定多个@TopicPartition注解来描述消费的分区,通过concurrency属性来指定kafka创建的消费者数量;在@TopicPartition中可以指定消费的topic、partition、partitionOffsets等

    Kafka集群的Controller、rebalance和HW

    Controller

    Kafka集群中的broker在zk中创建临时序号结点,序号最小的结点(即最先创建的结点)将会作为集群的controller,负责管理整个集群中的所有分区和副本的状态:

    • 当某个分区的leader副本出现故障时,由Controller负责为该分区选举新的leader副本
    • 当检测到某个分区的ISR集合(可同步和已同步结点集合)发生变化时,由Controller负责通知所有broker更新元数据
    • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由Controller负责让分区被其它节点感知到

    Rebalance机制

    在消费者没有指明分区消费的前提下当消费组里的消费者和分区的关系发生变化,就会触发rebalance,这个机制会重新调整消费者消费哪个分区,在触发rebalance之前,消费者消费哪个分区有三种策略:

    • range:通过公式计算某个消费者消费哪个分区
    • 轮询:每个分区由消费者轮流消费
    • sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整,即某个消费者挂了之后,其他消费者不会改变原来需要消费的消息,只用新分配这个挂掉的消费者的任务

    HW和LEO

    HW俗称高水位,取一个partition对应的ISR中最小的LEO(log-end-offset)设为HWconsumer最多只能消费到HW所在的位置,另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息所有ISR中的replicas同步更新HW,保证了如果leader所在的broker失效,该消息仍然可以从新选举出来的leader中获取

    kafka线上问题

    防止消息丢失

    • 发送方:将ack设置为1或者-1/all可以防止消息丢失
    • 消费方:把自动提交offset改为手动提交

    防止消息重复消费

    在防止消息丢失的基础上,如果生产者发完消息,因为网络抖动没有收到ack,但是broker已经收到了消息,此时下生产者会进行重试,于是broker就会收到多条相同的消息,造成消息重复消费

    • mysql插入业务id作为主键,主键唯一,所以一次只能插入一条
    • 使用redis或zk的分布式锁,以业务id为锁,保证只有一次操作可以加上锁

    保证消息顺序消费

    • 生产者:保证消息按顺序生产,且消息不丢失,即使用同步发送方式
    • 消费者:topic只能设置一个partition,消费组中订阅这个topic的consumer只能有一个

    解决消息积压问题

    消息的消费者的消费速度远赶不上生产者生产消息的速度,就会导致kafka中大量数据没有被消费,当消息堆积雨来越多,会导致消费者寻址的性能越来越差,从而造成kafka性能降低,导致其他服务的访问性能也变慢,造成服务雪崩

    • 在消费者中,使用多线程,充分利用cpu来消费消息
    • 创建多个消费组,多个消费者,部署在其他机器上,加快消费速度

    延时消息队列

    • 创建多个topic,每个topic表示延时的间隔,如topic_5s延时5s执行的队列、topic_1m延时1min执行的队列、topic_30m延时30min执行的队列
    • 生产者发送消息到topic,并带上消息的发送时间
    • 消费者订阅响应的topic,消费时轮询消费整个topic的消息,如果消息的发送时间到现在的时间间隔超过了队列预设的时间,则消费该消息;没有超过则等待1分钟,再次尝试实现延时的效果
  • 相关阅读:
    星巴克推出Web3平台;天啦噜,AI绘画能007了;『决策算法』电子书;合成人脸数据集;面向数据的版本控制;前沿论文 | ShowMeAI资讯日报
    [IDE工具]Ubuntu18.04 VSCode版本升级
    FreeRTOS入门教程(同步与互斥)
    【Linux-Day10-信号量,共享内存,消息队列】
    三维模型3DTile格式轻量化压缩处理的数据质量提升方法分析
    java计算机毕业设计瀚绅睿茨二人二轮车租赁管理MyBatis+系统+LW文档+源码+调试部署
    .Net WebApi 中的 FromBody FromForm FromQuery FromHeader FromRoute
    自动化之RPA工具之UiBot
    Fabric升级智能合约
    淘宝商品链接获取淘宝商品评论数据(用 Python实现淘宝商品评论信息抓取)
  • 原文地址:https://blog.csdn.net/m0_48468380/article/details/127666274