目录
本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。
- /**
- * 指定一个消费者组,一个主题主题。
- * @param record
- */
- @KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
- public void simpleConsumer(ConsumerRecord
record) { - System.out.println("进入simpleConsumer方法");
- System.out.printf(
- "分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
- record.partition(),
- record.offset(),
- record.key(),
- record.value(),
- record.timestamp()
- );
- }
- /**
- * 指定多个主题。
- *
- * @param record
- */
- @KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
- public void topics(ConsumerRecord
record) { - System.out.println("进入topics方法");
- System.out.printf(
- "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value(),
- record.timestamp()
- );
- }
- /**
- * 监听一个主题,且指定消费主题的哪些分区。
- * 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
- * @param record
- */
- @KafkaListener(
- groupId = APPLE_GROUP,
- topicPartitions = {
- @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
- },
- concurrency = "2"
- )
- public void consumeByPattern(ConsumerRecord
record) { - System.out.println("consumeByPattern");
- System.out.printf(
- "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value(),
- record.timestamp()
- );
- }
- /**
- * 指定多个分区从哪个偏移量开始消费。
- */
- @KafkaListener(
- groupId = APPLE_GROUP,
- topicPartitions = {
- @TopicPartition(
- topic = IPAD_TOPIC,
- partitions = {"0","1"},
- partitionOffsets = {
- @PartitionOffset(partition = "2", initialOffset = "10"),
- @PartitionOffset(partition = "3", initialOffset = "0"),
- }
- )
- },
- concurrency = "10"
- )
- public void consumeByPartitionOffsets(ConsumerRecord
record) { - System.out.println("consumeByPartitionOffsets");
- System.out.printf(
- "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value(),
- record.timestamp()
- );
- }
- /**
- * 指定多个主题。参数详解如上面的方法。
- * @param record
- */
- @KafkaListener(
- groupId = APPLE_GROUP,
- topicPartitions = {
- @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
- @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
- partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
- },
- concurrency = "4"
- )
- public void topics2(ConsumerRecord
record) { - System.out.println("topics2");
- System.out.printf(
- "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value(),
- record.timestamp()
- );
- }
- /**
- * 指定多个消费者组。参数详解如上面的方法。
- *
- * @param record
- */
- @KafkaListeners({
- @KafkaListener(
- groupId = APPLE_GROUP,
- topicPartitions = {
- @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
- @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
- partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
- },
- concurrency = "3"
- ),
- @KafkaListener(
- groupId = XM_GROUP,
- topicPartitions = {
- @TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
- @TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
- partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
- },
- concurrency = "3"
- )
- }
- )
- public void groupIds(ConsumerRecord
record) { - System.out.println("groupIds");
- System.out.println("内容:" + record.value());
- System.out.println("分区:" + record.partition());
- System.out.println("偏移量:" + record.offset());
- System.out.println("创建消息的时间戳:" + record.timestamp());
- }
- /**
- * 设置手动提交偏移量
- *
- * @param record
- */
- @KafkaListener(
- topics = IPHONE_TOPIC,
- groupId = APPLE_GROUP,
- //3个消费者
- concurrency = "3"
- )
- public void setCommitType(ConsumerRecord
record, Acknowledgment ack) { - System.out.println("setCommitType");
- System.out.println("内容:" + record.value());
- System.out.println("分区:" + record.partition());
- System.out.println("偏移量:" + record.offset());
- System.out.println("创建消息的时间戳:" + record.timestamp());
- ack.acknowledge();
- }