application.properties配置文件如下
- #kafka多数据源配置
- #kafka数据源一,日志审计推送
- spring.kafka.one.bootstrap-servers=172.19.12.109:32182
- spring.kafka.one.producer.retries=0
- spring.kafka.one.producer.properties.max.block.ms=5000
- #kafka数据源二,动环数据消费
- spring.kafka.two.bootstrap-servers=172.19.12.109:32182
- spring.kafka.two.producer.retries=0
- spring.kafka.two.producer.properties.max.block.ms=5000
- spring.kafka.two.consumer.group-id=bw-convert-data
- spring.kafka.two.consumer.enable-auto-commit=true
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- dependency>
- package com.gstanzer.convert.config;
-
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.core.*;
- import java.util.HashMap;
- import java.util.Map;
-
- @EnableKafka
- @Configuration
- public class KafkaOneConfig {
-
- @Value("${spring.kafka.one.bootstrap-servers}")
- private String bootstrapServers;
- @Value("${spring.kafka.one.producer.retries}")
- private String retries;
- @Value("${spring.kafka.one.producer.properties.max.block.ms}")
- private String maxBlockMs;
-
- @Bean
- public KafkaTemplate
kafkaOneTemplate() { - return new KafkaTemplate<>(producerFactory());
- }
-
- private ProducerFactory
producerFactory() { - return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- private Map
producerConfigs() { - Map
props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.RETRIES_CONFIG, retries);
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return props;
- }
- }
- package com.gstanzer.convert.config;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.*;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Configuration
- @EnableKafka
- public class KafkaTwoConfig {
-
- @Value("${spring.kafka.two.bootstrap-servers}")
- private String bootstrapServers;
- @Value("${spring.kafka.two.producer.retries}")
- private String retries;
- @Value("${spring.kafka.two.producer.properties.max.block.ms}")
- private String maxBlockMs;
- @Value("${spring.kafka.two.consumer.group-id}")
- private String groupId;
- @Value("${spring.kafka.two.consumer.enable-auto-commit}")
- private boolean enableAutoCommit;
-
- @Bean
- public KafkaTemplate
kafkaTwoTemplate() { - return new KafkaTemplate<>(producerFactory());
- }
-
- @Bean
- KafkaListenerContainerFactory
> kafkaTwoContainerFactory() { - ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(3);
- factory.getContainerProperties().setPollTimeout(3000);
- return factory;
- }
-
- private ProducerFactory
producerFactory() { - return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- public ConsumerFactory
consumerFactory() { - return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- private Map
producerConfigs() { - Map
props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.RETRIES_CONFIG, retries);
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return props;
- }
-
- private Map
consumerConfigs() { - Map
props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return props;
- }
-
- }
- @Controller
- public class TestController {
-
- @Autowired
- private KafkaTemplate kafkaOneTemplate;
- @Autowired
- private KafkaTemplate kafkaTwoTemplate;
-
- @RequestMapping("/send")
- @ResponseBody
- public String send() {
- final String TOPIC = "TOPIC_1";
- kafkaOneTemplate.send(TOPIC, "kafka one");
- kafkaTwoTemplate.send(TOPIC, "kafka two");
-
- return "success";
- }
- }
- @Component
- public class KafkaConsumer {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
-
- final String TOPIC = "TOPIC_1";
-
- // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
- @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
- public void listenerOne(ConsumerRecord, ?> record) {
- LOGGER.info(" kafka one 接收到消息:{}", record.value());
- }
-
- @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
- public void listenerTwo(ConsumerRecord, ?> record) {
- LOGGER.info(" kafka two 接收到消息:{}", record.value());
- }
- }
备注:
生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:
Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客