• springboot项目整合kafka实现消息队列


    一、Docker镜像、容器准备:

    1.1拉取镜像:

    前提是虚拟机安装了docker,可以自行看其他文章进行安装

    docker pull ubuntu/kafka 
    docker pull zookeeper
    
    • 1
    • 2

    1.2运行容器

    先启动zookeeper容器,因为kafka依赖于zookeeper

    docker run -d \
    	--network zk_k\ #指定网络-p 2181:2181 \ #暴露端口-v /mydata/zookeeper/data:/data  \  #数据挂载,先创建/mydata/zookeeper/data目录--name zookeeper \ #指定容器名称
     	 zookeeper:latest  #镜像名称
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    启动kafka容器

    # 启动 kafka
    docker run -d  -p 9092:9092 \
    --name kafka \
    --network hmall \
    -e KAFKA_BROKER_ID=0 \
    -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.130:2181 \ # 连接上一步运行的zookeeper容器
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.130:9092 \ # kafka服务监听设置,IP为当前主机IP
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ # 默认就好了
    -v /mydata/kafka/config:/opt/kafka/config \ # 数据挂载,同样先创建好目录,上一步类似
    -v /mydata/kafka/logs:/opt/kafka/logs \
    ubuntu/kafka:latest
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    本文是单机部署,有兴趣做集群部署的可以参考文章

    二、创建一个springboot项目

    2.1导入kafka依赖

     
     <dependency>
          <groupId>org.springframework.kafkagroupId>
          <artifactId>spring-kafkaartifactId>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2application.properties配置

    先去虚拟机/mydata/kafka/config目录修改consumer.properties文件,如图所示

    在这里插入图片描述

    同样provider.properties也要改

    在这里插入图片描述

    #运行kafka服务的主机IP和kafka端口
    spring.kafka.bootstrap-servers=192.168.31.130:9092
    #组id,在consumer.properties文件修改 
    spring.kafka.consumer.group-id=my-group
    #表示开启消费者自动提交偏移量
    spring.kafka.consumer.enable-auto-commit=true
    #表示自动提交偏移量的时间间隔为 3000 毫秒。
    spring.kafka.consumer.auto-commit-interval=3000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.3编写消息生产者

    @Component
    @Slf4j
    public class KafkaProvider {
    	//kafka依赖注入
        @Autowired
        private KafkaTemplate kafkaTemplate;
    	
        public void send(String topic, String message) {
            log.info("发送消息到Kafka topic: {}; 消息内容: {}", topic, message);
            kafkaTemplate.send(topic, message);//执行发送消息
            log.info("生产者消息发送成功");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.4编写消息消费者

    @Component
    @Slf4j
    public class KafkaConsumer {
    	//对主题topic_name进行监听,当生产者对主题topic_name生产消息,该消费者就会监听到,然后进行消息消费
        @KafkaListener(topics = {"topic_name"})
        //ConsumerRecord 对象存储消息及对消息的处理
        public void receive(ConsumerRecord record){
        	//监听到消息后的业务逻辑处理
            log.info("消费者接收到消息 message: {}", record.value());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.5测试

    @RestController
    public class KafkaController {
    
        @Autowired
        private KafkaProvider kafkaProvider;
    
        @RequestMapping("/send")
        public String send(String topic, String message) {
            kafkaProvider.send(topic,message);
            return "success";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    测试结果如图所示

    浏览器发送消息如图

    在这里插入图片描述

    消费者消费了消息如图

    在这里插入图片描述

    生产者生产消息如图

    在这里插入图片描述

    踩坑

    如果没有修改配置文件的server.properties里的advertised.listeners=PLAINTEXT://192.168.31.130:9092就报如图所示的错

    在这里插入图片描述

  • 相关阅读:
    电源ATE自动测试系统为您提供一站式自动化测试解决方案
    js数据结构(队列Queue)
    LeetCode——动态规划篇(一)
    ODC现已开源:与开发者共创企业级的数据库协同开发工具
    程序员短视频上瘾综合症
    QT读取Excel表格内容到Table Widget
    [附源码]计算机毕业设计面包连锁店管理系统Springboot程序
    一Vue基础:绑定样式、条件渲染、列表渲染、列表过滤
    拓扑排序(一部分)
    正则表达式从理论到时间
  • 原文地址:https://blog.csdn.net/qq_43634655/article/details/138002654