• 【Springboot】整合kafka


    zookeeper安装

    安装zookeeper之前需要先安装JDK,我是用的JDK1.8,虚拟机是CentOS7
    步骤如下:

    1. 下载自己需要的版本,链接:https://pan.baidu.com/s/1Bkvy0ChTVH0hrKkxiE4ZDg 提取码:1zws

    2. 将JDK上传到安装目录并解压
      使用linux连接工具,将下载的JDK压缩包上传到要安装的目录下,这里使用的是windTerm,我是将压缩包上传到/usr/java(自己创建的)目录下,解压命令如下

      tar -zxvf jdk-8u152-linux-x64.tar.gz
      
      • 1

      解压后目录下面多个文件夹
      在这里插入图片描述

    3. 配置JDK环境变量,跟windows配置差不多

      vim /etc/profile
      
      • 1

      在文件最后面添加如下配置,JAVA_HOME是自己安装的目录,需要改为自己的目录

      export JAVA_HOME=/usr/java/jdk1.8.0_152
      export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/    
      export PATH=$PATH:$JAVA_HOME/bin
      
      • 1
      • 2
      • 3

      重新加载系统环境变量

      source /etc/profile
      
      • 1
    4. 验证是否安装成功

      java -version
      
      • 1

      出现如下内容说明安装成功
      在这里插入图片描述

    zookeeper安装

    1. 下载压缩包:我使用的是apache-zookeeper-3.8.0-bin.tar.gz
      官网地址:https://zookeeper.apache.org/releases.html

    2. 将zookeeper压缩包上传到安装目录并解压
      我的是/usr/zookeeper,上传并解压
      在这里插入图片描述

    3. 修改配置文件
      进入解压后zookeeper文件夹里面的conf目录下,该目录下有一个名zoo_sample.cfg文件,将该文件重新名为zoo.cfg,或者重新复制一份命名为zoo.cfg。使用vim命令打开zoo.cfg文件编辑,主要修改两个地方,dataDir,dataLogDir指定的目录,具体目录自己创建
      在这里插入图片描述

    4. 启动zookeeper
      进入zookeeper安装目录下的bin目录
      在这里插入图片描述
      进入bin目录如下
      在这里插入图片描述

      启动zookeeper服务端:./zkServer.sh start
      在这里插入图片描述

      查看启动状态:./zkServer.sh status
      在这里插入图片描述
      启动zookeeper客户端

      [root@localhost bin]# ./zkCli.sh
      
      • 1

      在这里插入图片描述

    kafka安装(非集群)

    1. 首先是下载自己需要的版本的压缩包
      官网下载地址:https://kafka.apache.org/downloads

    2. 将压缩包上传到安装目录并解压

      tar -zxvf kafka压缩包名称
      
      • 1
    3. 修改kafka的server.properties配置文件
      进入解压后的文件能看到如下内容,server.properties配置文件在config目录下
      在这里插入图片描述
      我这里是非集群,修改配置文件也比较简单,使用vi或者vim文本编辑命令打开server.properties文件,修改下面配置

      #kafka服务器地址
      listeners=PLAINTEXT://10.3.1.156:9092 
      
      • 1
      • 2
      #zk服务器地址,我这里zk跟kafka在一台虚拟机上,如果不在一台机器上,这里对应的ip应该是zk安装机器的ip
      zookeeper.connect=10.3.1.156:2181
      
      • 1
      • 2
      log.dirs=/usr/kafka/kafka-logs
      
      • 1
    4. 非集群方法配置就完成了,下面就是启动kafka服务,进入kafka文件下的bin目录下,使用命令的方式体验消息发送和接收
      启动kafka

      ./kafka-server-start.sh -daemon ../config/server.properties
      
      • 1

      创建topic

      ./kafka-topics.sh --bootstrap-server 10.3.1.156:9092 --create --topic test --replication-factor 1 --partitions 1
      
      • 1

      查看topic

      ./kafka-topics.sh --list --bootstrap-server 10.3.1.156:9092
      
      • 1

      发消息

      ./kafka-console-producer.sh --bootstrap-server 10.3.1.156:9092 --topic test
      
      • 1

      接收消息

      ./kafka-console-consumer.sh --bootstrap-server 10.3.1.156:9092 --topic test
      
      • 1

      删除topic

      ./kafka-topics.sh --bootstrap-server 10.3.1.156:9092 --delete --topic test
      
      • 1

    springboot项目整合配置

    1. 新建的项目pom文件中导入kafka依赖
      <dependency>
         <groupId>org.springframework.kafkagroupId>
         <artifactId>spring-kafkaartifactId>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
    2. 配置文件添加kafka相关配置
      server:
        port: 8080
      spring:
        kafka:
          bootstrap-servers: 10.3.1.156:9092 #kafka服务地址
          producer: #生产者
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer: #消费者
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            auto-offset-reset: earliest
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    3. 客户端发送消息
      @RestController
      public class KafkaProducerController {
      
          @Autowired
          private KafkaTemplate<String, String> kafkaTemplate;
      
          @GetMapping(value = "/sentMessage")
          public String sentMessage(@RequestParam("msg") String msg){
              kafkaTemplate.send("test", msg);
              return "发送成功";
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    4. 消费者接收消息
      @Component
      public class KafkaConsumerListener {
          @KafkaListener(topics = "test", groupId = "my-test-group")
          public void consumer(ConsumerRecord<String,String> record){
              System.out.println("接收到的消息:" + record.value());
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

    涉及代码地址:代码地址

  • 相关阅读:
    QT之OpenGL 计算机图形学线性代数基本知识
    熊猫烧香病毒,当年究竟有多可怕?
    代码随想录算法训练营第二十五天|216.组合总和III,17. 电话号码的字母组合
    web前端课程设计——动漫网页2个网页HTML+CSS web前端开发技术 web课程设计 网页规划与设计
    【QT】Qt项目demo:数据在ui界面上显示,鼠标双击可弹窗显示具体信息
    strcpy, strcat,strcmp的介绍和模拟实现
    Redis设计与实现之简单动态字符串
    怎么给iPhone手机上的待办事项软件加上密码锁
    conda环境安装opencv带cuda版本
    经验分享,免费商标查询网站
  • 原文地址:https://blog.csdn.net/qq_42494654/article/details/132872639