• Spring Boot配置多个Kafka数据源


    一、配置文件

    application.properties配置文件如下

    1. #kafka多数据源配置
    2. #kafka数据源一,日志审计推送
    3. spring.kafka.one.bootstrap-servers=172.19.12.109:32182
    4. spring.kafka.one.producer.retries=0
    5. spring.kafka.one.producer.properties.max.block.ms=5000
    6. #kafka数据源二,动环数据消费
    7. spring.kafka.two.bootstrap-servers=172.19.12.109:32182
    8. spring.kafka.two.producer.retries=0
    9. spring.kafka.two.producer.properties.max.block.ms=5000
    10. spring.kafka.two.consumer.group-id=bw-convert-data
    11. spring.kafka.two.consumer.enable-auto-commit=true

    二、pom依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    三、生产者、消费者配置

    1.第一个kakfa

    1. package com.gstanzer.convert.config;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.common.serialization.StringSerializer;
    4. import org.springframework.beans.factory.annotation.Value;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.kafka.annotation.EnableKafka;
    8. import org.springframework.kafka.core.*;
    9. import java.util.HashMap;
    10. import java.util.Map;
    11. @EnableKafka
    12. @Configuration
    13. public class KafkaOneConfig {
    14. @Value("${spring.kafka.one.bootstrap-servers}")
    15. private String bootstrapServers;
    16. @Value("${spring.kafka.one.producer.retries}")
    17. private String retries;
    18. @Value("${spring.kafka.one.producer.properties.max.block.ms}")
    19. private String maxBlockMs;
    20. @Bean
    21. public KafkaTemplate kafkaOneTemplate() {
    22. return new KafkaTemplate<>(producerFactory());
    23. }
    24. private ProducerFactory producerFactory() {
    25. return new DefaultKafkaProducerFactory<>(producerConfigs());
    26. }
    27. private Map producerConfigs() {
    28. Map props = new HashMap<>();
    29. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    30. props.put(ProducerConfig.RETRIES_CONFIG, retries);
    31. props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
    32. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    33. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    34. return props;
    35. }
    36. }

    2.第二个kakfa

    1. package com.gstanzer.convert.config;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.producer.ProducerConfig;
    4. import org.apache.kafka.common.serialization.StringDeserializer;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import org.springframework.beans.factory.annotation.Value;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. import org.springframework.kafka.annotation.EnableKafka;
    10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    11. import org.springframework.kafka.config.KafkaListenerContainerFactory;
    12. import org.springframework.kafka.core.*;
    13. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    14. import java.util.HashMap;
    15. import java.util.Map;
    16. @Configuration
    17. @EnableKafka
    18. public class KafkaTwoConfig {
    19. @Value("${spring.kafka.two.bootstrap-servers}")
    20. private String bootstrapServers;
    21. @Value("${spring.kafka.two.producer.retries}")
    22. private String retries;
    23. @Value("${spring.kafka.two.producer.properties.max.block.ms}")
    24. private String maxBlockMs;
    25. @Value("${spring.kafka.two.consumer.group-id}")
    26. private String groupId;
    27. @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    28. private boolean enableAutoCommit;
    29. @Bean
    30. public KafkaTemplate kafkaTwoTemplate() {
    31. return new KafkaTemplate<>(producerFactory());
    32. }
    33. @Bean
    34. KafkaListenerContainerFactory> kafkaTwoContainerFactory() {
    35. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    36. factory.setConsumerFactory(consumerFactory());
    37. factory.setConcurrency(3);
    38. factory.getContainerProperties().setPollTimeout(3000);
    39. return factory;
    40. }
    41. private ProducerFactory producerFactory() {
    42. return new DefaultKafkaProducerFactory<>(producerConfigs());
    43. }
    44. public ConsumerFactory consumerFactory() {
    45. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    46. }
    47. private Map producerConfigs() {
    48. Map props = new HashMap<>();
    49. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    50. props.put(ProducerConfig.RETRIES_CONFIG, retries);
    51. props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
    52. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    53. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    54. return props;
    55. }
    56. private Map consumerConfigs() {
    57. Map props = new HashMap<>();
    58. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    59. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    60. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    61. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    62. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    63. return props;
    64. }
    65. }

    四.生产者

    1. @Controller
    2. public class TestController {
    3. @Autowired
    4. private KafkaTemplate kafkaOneTemplate;
    5. @Autowired
    6. private KafkaTemplate kafkaTwoTemplate;
    7. @RequestMapping("/send")
    8. @ResponseBody
    9. public String send() {
    10. final String TOPIC = "TOPIC_1";
    11. kafkaOneTemplate.send(TOPIC, "kafka one");
    12. kafkaTwoTemplate.send(TOPIC, "kafka two");
    13. return "success";
    14. }
    15. }

    五.消费者

    1. @Component
    2. public class KafkaConsumer {
    3. private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
    4. final String TOPIC = "TOPIC_1";
    5. // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
    6. @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
    7. public void listenerOne(ConsumerRecord record) {
    8. LOGGER.info(" kafka one 接收到消息:{}", record.value());
    9. }
    10. @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
    11. public void listenerTwo(ConsumerRecord record) {
    12. LOGGER.info(" kafka two 接收到消息:{}", record.value());
    13. }
    14. }

    备注:

    生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:

    Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客

  • 相关阅读:
    微服务框架 SpringCloud微服务架构 5 Nacos 5.5 服务实例的权重设置
    JUC同步锁原理源码解析四----Semaphore
    WebRTC初识
    16进制的图片信息如何上传到FastDFS
    linux下 u2net tensorrt模型部署
    泛型的小结
    Spark和MR的本质区别
    Pandas统计列NaN值,这4步轻松搞定!
    md5在线加密解密是不是什么都能解密?为什么我的没有解出来呢?
    传奇版本添加npc修改增加npc方法以及配置参数教程
  • 原文地址:https://blog.csdn.net/qq_45443475/article/details/133928595