• CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)


    目录

    一、安装jdk(已安装可以不用管)

    二、下载Kafka

    三、配置Kafka

    四、启动Kafka

    五、SpringBoot连接测试

    一、安装jdk(已安装可以不用管)

    Kafka需要依赖jdk,这里先安装jdk 

    【1】卸载jdk

    有时候会因为各种原因我们需要卸载jdk,这里也记录一下

    1.  # 查看安装的jdk
    2.  rpm -qa|grep -i jdk

    1.  # 卸载jdk(根据自己的包名来进行卸载)
    2.  rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.332.b09-1.el7_9.x86_64
    3.  rpm -e --nodeps java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64
    4.  rpm -e --nodeps java-1.8.0-openjdk-devel-1.8.0.332.b09-1.el7_9.x86_64
    1.  # 卸载后查看jdk版本,发现找不到,卸载成功
    2.  java -version

    【2】安装jdk

    官网下载:Java Downloads | Oracle 找到 tar.gz 结尾的进行下载,上传到服务器进行安装配置

    1.  # 解压(我的解压目录为:/software/jdk/jdk1.8.0_333)
    2.  tar -zxvf jdk-8u333-linux-x64.tar.gz
    3.  ​
    4.  # 打开配置文件进行配置
    5.  vim /etc/profile
    6.  ​
    7.  # 添加如下配置(根据自己的路径来)
    8.  export JAVA_HOME=/software/jdk/jdk1.8.0_333
    9.  export JRE_HOME=/$JAVA_HOME/jre
    10.  export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    11.  export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    12.  ​
    13.  # 使配置文件生效
    14.  source /etc/profile
    15.  ​
    16.  # 查看是否安装成功
    17.  java -version

    二、下载Kafka

    官网:Apache Downloads

    根据对应的版本,使用wget下载,如果下载速度太慢,可以在Windows上通过下载器下载,然后上传到服务器上

     wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz --no-check-certificate

    上传后进行解压

     tar -axvf kafka_2.13-3.2.0.tgz

    注,解压路径不宜过长,否则后面启动的时候可能会报错,所以这里我将kafka_2.13-3.2.0重命名为kafka

     mv kafka_2.13-3.2.0 kafka

    解压后目录结构如下:

    三、配置Kafka

    这里只是单机kafka的配置,并不是集群的配置

    【1】配置kafka的log路径

    打开config目录下的server.properties文件修改配置

    1.  # 打开配置文件
    2.  vim config/server.properties
    3.  ​
    4.  # 配置日志存放位置(启动的时候会自己在相应的目录下创建kafka-logs文件夹)
    5.  log.dirs=/software/kafka/kafka-logs

    【2】配置外网访问

    还是修改server.properties文件,找到下面配置进行修改,位置大概在34行左右

    1.  # 放开注解
    2.  listeners=PLAINTEXT://:9092
    3.  ​
    4.  # 修改注解,改成自己服务器ip
    5.  advertised.listeners=PLAINTEXT://42.104.249.49:9092

    【3】配置zookeeper数据路径

    先在解压目录下创建zookeeper文件夹用来存放数据,再打开config目录下的zookeeper.properties文件修改配置

    1.  # 打开配置文件
    2.  vim config/zookeeper.properties
    3.  ​
    4.  # 配置数据存放位置(启动的时候会自己在相应的目录下创建zookeeper文件夹)
    5.  dataDir=/software/kafka/zookeeper

    【4】放通端口

    1.  # 放通9092端口
    2.  firewall-cmd --zone=public --add-port=9092/tcp --permanent
    3.  ​
    4.  # 重启防火墙
    5.  firewall-cmd --reload

    注意这里除了放通centos的端口,还要将安全组的端口也放通,比如我这里用的是腾讯云,就要上腾讯云的控制台开放相应的安全组端口

    四、启动Kafka

    1.  # 启动zookeeper
    2.  bin/zookeeper-server-start.sh config/zookeeper.properties
    3.  ​
    4.  # 启动kafka
    5.  bin/kafka-server-start.sh config/server.properties
    6.  ​
    7.  # 生产消息(创建名为testTopic的主题)
    8.  bin/kafka-console-producer.sh --topic testTopic --bootstrap-server localhost:9092
    9.  ​
    10.  # 监听消息(重开一个终端监听testTopic主题的消息,在生产消息的终端发消息,此终端收消息)
    11.  bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server localhost:9092

    在监听消息的终端能够收到消息,则说明安装配置成功

    五、SpringBoot连接测试

    目录结构如下:

    【1】引入pom依赖

    1.  <dependency>
    2.      <groupId>org.apache.kafkagroupId>
    3.      <artifactId>kafka-clientsartifactId>
    4.      <version>3.2.0version>
    5.  dependency>
    6.  <dependency>
    7.      <groupId>org.projectlombokgroupId>
    8.      <artifactId>lombokartifactId>
    9.  dependency>

    【2】配置application.yml

    bootstrap-servers配置为自己服务器的IP

    1.  spring:
    2.   kafka:
    3.      # 消费者
    4.     consumer:
    5.       group-id: foo
    6.       auto-offset-reset: earliest
    7.       bootstrap-servers: 42.104.249.49:9092
    8.      # 生产者
    9.     producer:
    10.       bootstrap-servers: 42.104.249.49:9092
    11.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
    12.       value-serializer: org.apache.kafka.common.serialization.StringSerializer

    【3】生产消息

    生产消息通过controller访问接口来进行测试

    1. @RestController
    2.  public class KafkaProducerController {
    3.      
    4.      @Autowired
    5.      private KafkaTemplate kafkaTemplate;
    6.      
    7.      @GetMapping("/send")
    8.      public void send() {
    9.          // testTopic为主题,和上面终端测试的保持一致,方便测试,onestar为要发送的消息
    10.          kafkaTemplate.send("testTopic","onestar");
    11.     }
    12.  }

    【4】监听消费消息

    1. @Component
    2.  @Slf4j
    3.  public class KafkaConsumer {
    4.      /**
    5.       * kafka的监听器,topic为"testTopic",消费者组默认为配置文件里面的
    6.       */
    7.      @KafkaListener(topics = {"testTopic"})
    8.      public void onMessage1(ConsumerRecord consumerRecord) {
    9.          Optional optional = Optional.ofNullable(consumerRecord.value());
    10.          if (optional.isPresent()) {
    11.              Object msg = optional.get();
    12.              log.info("message:{}", msg);
    13.         }
    14.     }
    15.  }

    【5】测试

    为了方便查看,我们可以将上面的监听消息终端开着,然后运行代码,通过访问 http://localhost:8080/send 进行测试

    从打印的日志可以看到消息已经消费了,并且在监听消息的终端也打印出消息

  • 相关阅读:
    ios开发错误积累
    pycharm安装jupyter,用德古拉主题,但是输入行全白了,看不清,怎么办?
    intel 一些偏门汇编指令总结
    【黄啊码】MySQL入门—5、掌握这些数据筛选技能比你学python还有用-2
    day05 51单片机-外部中断、定时器
    初阶数据结构学习记录——여덟 二叉树
    无参数构造器
    MySQL日志系统
    PowerBI真经连续剧
    C# 中的“智能枚举”:如何在枚举中增加行为
  • 原文地址:https://blog.csdn.net/One_L_Star/article/details/125897648