• springboot集成kafka


    第一步,安装kafka,我上一篇文章写了docker安装的详细步骤:
    https://blog.csdn.net/wuyongde_0922/article/details/126192435

    建立项目,直接采用idea的initializr方式:
    在这里插入图片描述引入pom文件:

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.3.4.RELEASEversion>
            <relativePath/>
        parent>
        <groupId>com.wydgroupId>
        <artifactId>springboot-kafkaartifactId>
        <version>1.0.1version>
        <name>springboot-kafkaname>
    
        <properties>
            <java.version>1.8java.version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
            <dependency>
                <groupId>com.google.code.gsongroupId>
                <artifactId>gsonartifactId>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <version>1.18.22version>
                <scope>providedscope>
            dependency>
    
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    引入application.yml文件:

    server:
      port: 8005
      servlet:
        context-path: /kafka
    spring:
      kafka:
        bootstrap-servers: 47.106.177.244:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
        consumer:
          group-id: consumer-group-wyd
          properties:
            session.timeout.ms: 15000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          auto-offset-reset: latest
          enable-auto-commit: true
          auto-commit-interval: 100ms
        producer:
          acks: 1
          retries: 3 #若设置大于0的值,客户端会将发送失败的记录重新发送
          batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
          buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
          key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
          value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
    kafka:
      topic:
        order: order
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    建立 初始化bean类:

    package com.wyd.config;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.wyd.entity.Message;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    import java.util.Arrays;
    import java.util.Date;
    
    @Component
    @Slf4j
    public class KafkaConfig {
    
      @Value("${kafka.topic.order}")
      private String orderTopic;
    
      /**
       * 创建topic
       */
      @Bean
      public boolean createOrderTopic(){
        log.info("kafka创建topic:{}",orderTopic);
        NewTopic topic = new NewTopic(orderTopic,2,(short)1);
        return true;
      }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    生产者发送消息工具类:

    package com.wyd.config;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.wyd.entity.Message;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    import java.util.Date;
    
    @Component
    @Slf4j
    public class KafkaSender {
    
      private final KafkaTemplate<String, String> kafkaTemplate;
    
      //构造器方式注入 kafkaTemplate
      public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
      }
    
      private Gson gson = new GsonBuilder().create();
    
      public void send(String topicName,String msg) {
        try{
          //生产消息
          Message message = new Message();
    
          message.setId(System.currentTimeMillis());
          message.setMsg(msg);
          message.setSendTime(new Date());
          log.info("发送消息>>>message :{}", gson.toJson(message));
    
          ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, gson.toJson(message));
          listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
              log.info("发送成功回调:{}",gson.toJson(result.getProducerRecord().value()));
            }
            @Override
            public void onFailure(Throwable ex) {
              log.info("发送失败回调");
            }
          });
        }catch (Exception e){
          log.info("发送异常");
          e.printStackTrace();
        }
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    实体类message:

    package com.wyd.entity;
    
    import lombok.Data;
    
    import java.util.Date;
    
    /**
     * @author johnny
     * @create 2020-09-23 上午9:21
     **/
    @Data
    public class Message {
    
    
      private Long id;
    
      private String msg;
    
      private Date sendTime;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    消费者类 :

    package com.wyd.hander;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Component
    @Slf4j
    public class KafkaConsumer {
    
      @KafkaListener(topics = {"${kafka.topic.order}"},groupId="${spring.kafka.consumer.group-id}")
      public void receiveOrder(ConsumerRecord<String, String> record) {
          String topic = record.topic();
          String msg = record.value();
          log.info("消费者1接受消息:topic-->"+topic+",msg->>"+msg);
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    测试类:

    package com.wyd.controller;
    
    import com.wyd.config.KafkaSender;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    public class TestController {
    
    
      @Autowired
      private KafkaSender kafkaSender;
    
    
      @GetMapping("sendMessage/{msg}")
      public String sendMessage(@PathVariable("msg") String msg){
          kafkaSender.send("order",msg);
          return "kafka消息已发送";
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    启动类:

    package com.wyd;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class SpringbootKafkaApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringbootKafkaApplication.class, args);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    访问地址:
    http://localhost:8005/kafka/sendMessage/haha

    查看控制台日志:
    在这里插入图片描述发送、结束正常。

    注意事项:

    1)上面例子如果多份由于group-id是一样是点对点模式,自动ack消息

    2)如果需要采用广播模式,则只需要修改这个参数为不同的值则可,spring.kafka.consumer.group-id,然后还是监听同一个队列kafka.topic.order

    项目地址:
    https://gitee.com/wuyongde/springboot-kafka.git

  • 相关阅读:
    L2-035 完全二叉树的层序遍历(Python)
    全球细胞和组织培养试剂行业调研及趋势分析报告
    购物车下单实现思路【vue】
    虹科示波器 | 汽车免拆检修 | 2010款奥迪A5车怠速时发动机偶尔自动熄火
    Datax-异构数据源离线同步
    c++数据结构算法复习基础-- 3 --线性表-单向链表-笔试面试常见问题
    IntelliJ IDEA 远程调试 Tomcat
    Oracle中ALTER TABLE的五种用法(三)
    基于STM32设计的温室大棚种植监测系统(onenet+GPRS通信)
    汇编语言循环左移和循环右移如何实现的,详细的比喻一下
  • 原文地址:https://blog.csdn.net/wuyongde0922/article/details/126195942