• SpringBoot整合kafka消费者注解详解


    目录

    目标

    实战

    简单消费

    监听多个主题

    监听一个主题,指定分区消费消息

    指定多个分区,指定起始偏移量消费消息

    监听多个主题,指定多个分区,指定起始偏移量消费消息

    指定多个kafka监听器

    手动提交偏移量(需要配置手动提交偏移量配置)


    目标

    本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。


    实战

    简单消费

    1. /**
    2. * 指定一个消费者组,一个主题主题。
    3. * @param record
    4. */
    5. @KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
    6. public void simpleConsumer(ConsumerRecord record) {
    7. System.out.println("进入simpleConsumer方法");
    8. System.out.printf(
    9. "分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
    10. record.partition(),
    11. record.offset(),
    12. record.key(),
    13. record.value(),
    14. record.timestamp()
    15. );
    16. }

    监听多个主题

    1. /**
    2. * 指定多个主题。
    3. *
    4. * @param record
    5. */
    6. @KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
    7. public void topics(ConsumerRecord record) {
    8. System.out.println("进入topics方法");
    9. System.out.printf(
    10. "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
    11. record.topic(),
    12. record.partition(),
    13. record.offset(),
    14. record.key(),
    15. record.value(),
    16. record.timestamp()
    17. );
    18. }

    监听一个主题,指定分区消费消息

    1. /**
    2. * 监听一个主题,且指定消费主题的哪些分区。
    3. * 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
    4. * @param record
    5. */
    6. @KafkaListener(
    7. groupId = APPLE_GROUP,
    8. topicPartitions = {
    9. @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
    10. },
    11. concurrency = "2"
    12. )
    13. public void consumeByPattern(ConsumerRecord record) {
    14. System.out.println("consumeByPattern");
    15. System.out.printf(
    16. "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
    17. record.topic(),
    18. record.partition(),
    19. record.offset(),
    20. record.key(),
    21. record.value(),
    22. record.timestamp()
    23. );
    24. }

    指定多个分区,指定起始偏移量消费消息

    1. /**
    2. * 指定多个分区从哪个偏移量开始消费。
    3. */
    4. @KafkaListener(
    5. groupId = APPLE_GROUP,
    6. topicPartitions = {
    7. @TopicPartition(
    8. topic = IPAD_TOPIC,
    9. partitions = {"0","1"},
    10. partitionOffsets = {
    11. @PartitionOffset(partition = "2", initialOffset = "10"),
    12. @PartitionOffset(partition = "3", initialOffset = "0"),
    13. }
    14. )
    15. },
    16. concurrency = "10"
    17. )
    18. public void consumeByPartitionOffsets(ConsumerRecord record) {
    19. System.out.println("consumeByPartitionOffsets");
    20. System.out.printf(
    21. "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
    22. record.topic(),
    23. record.partition(),
    24. record.offset(),
    25. record.key(),
    26. record.value(),
    27. record.timestamp()
    28. );
    29. }

    监听多个主题,指定多个分区,指定起始偏移量消费消息

    1. /**
    2. * 指定多个主题。参数详解如上面的方法。
    3. * @param record
    4. */
    5. @KafkaListener(
    6. groupId = APPLE_GROUP,
    7. topicPartitions = {
    8. @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
    9. @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
    10. partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
    11. },
    12. concurrency = "4"
    13. )
    14. public void topics2(ConsumerRecord record) {
    15. System.out.println("topics2");
    16. System.out.printf(
    17. "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
    18. record.topic(),
    19. record.partition(),
    20. record.offset(),
    21. record.key(),
    22. record.value(),
    23. record.timestamp()
    24. );
    25. }

    指定多个kafka监听器

    1. /**
    2. * 指定多个消费者组。参数详解如上面的方法。
    3. *
    4. * @param record
    5. */
    6. @KafkaListeners({
    7. @KafkaListener(
    8. groupId = APPLE_GROUP,
    9. topicPartitions = {
    10. @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
    11. @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
    12. partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
    13. },
    14. concurrency = "3"
    15. ),
    16. @KafkaListener(
    17. groupId = XM_GROUP,
    18. topicPartitions = {
    19. @TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
    20. @TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
    21. partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
    22. },
    23. concurrency = "3"
    24. )
    25. }
    26. )
    27. public void groupIds(ConsumerRecord record) {
    28. System.out.println("groupIds");
    29. System.out.println("内容:" + record.value());
    30. System.out.println("分区:" + record.partition());
    31. System.out.println("偏移量:" + record.offset());
    32. System.out.println("创建消息的时间戳:" + record.timestamp());
    33. }

    手动提交偏移量(需要配置手动提交偏移量配置)

    1. /**
    2. * 设置手动提交偏移量
    3. *
    4. * @param record
    5. */
    6. @KafkaListener(
    7. topics = IPHONE_TOPIC,
    8. groupId = APPLE_GROUP,
    9. //3个消费者
    10. concurrency = "3"
    11. )
    12. public void setCommitType(ConsumerRecord record, Acknowledgment ack) {
    13. System.out.println("setCommitType");
    14. System.out.println("内容:" + record.value());
    15. System.out.println("分区:" + record.partition());
    16. System.out.println("偏移量:" + record.offset());
    17. System.out.println("创建消息的时间戳:" + record.timestamp());
    18. ack.acknowledge();
    19. }

  • 相关阅读:
    实现按钮悬停动画
    python自动化Selenium的使用
    Spring IOC和AOP
    【力扣】83. 删除排序链表中的重复元素
    [linux] 由创建用户开始
    Mac M1 安装Docker打包arm64的python项目的镜像包
    10分钟读懂数据响应式和双向绑定原理
    区块链(11):java区块链项目之页面部分实现
    C语言——求1/1-1/2+1/3-......+1/99-1/100的值
    学习笔记:物理渲染-间接光照
  • 原文地址:https://blog.csdn.net/qq_39706570/article/details/127872644