• 2. kafka消息队列


    一、kafka消息队列

    消息服务, 简称MQ
    用于在分布式业务环境,实现不同组件、不同的功能模块的高效通信

    代表性的MQ软件:
    kafka, 十万并发
    RocketMQ 百万并发
    rabbitMQ
    zeroMQ

    二、消息服务的术语

    在这里插入图片描述

    • producer 生产者
      产生消息的进程

    • consumer 消费者
      接收、处理消息的进程

    • broker 消息服务器

    • topics 主题
      消息的分组,根据业务不同的模块建不同的主题

    • partation 分区
      确保某一个主题的消息的有序性

    三、kafka消息确认机制 ACK

    producer发送消息后,leader将消息同步给follower,然后返回ack给producer,表示消息已收到,此时才可以继续发送下一条消息。

    kafka提供了以下3种ack级别:
    0:leader接收到消息马上返回ack,此时可能还没有写入磁盘,可能丢失数据
    1:leader将消息写入磁盘后,马上返回ack,此时可能还没同步follower,同样可能丢失数据
    -1(all):leader和follower都将数据写入磁盘后,返回ack。但是如果在写入磁盘后,ack尚未发送,此时leader发生故障,会导致数据写入重复

    四、kafka安装部署

    1、环境规划

    192.168.140.10 kafka
    192.168.140.11 kafka
    192.168.140.12 kafka

    2、使用事先部署好的zookeeper管理kafka的高可用

    3、安装jdk

    4、安装kafka

    [root@node01 ~]# tar xf kafka_2.12-3.3.1.tgz -C /usr/local/
    [root@node01 ~]# mv /usr/local/kafka_2.12-3.3.1/ /usr/local/kafka33
    
    [root@node01 ~]# vim /etc/profile
    export KAFKA_HOME=/usr/local/kafka33
    export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
    
    [root@node01 ~]# source /etc/profile
    

    5、配置kafka

    [root@node01 ~]# mkdir /usr/local/kafka33/log
    [root@node01 ~]# vim /usr/local/kafka33/config/server.properties
    
    broker.id=0
    
    listeners=PLAINTEXT://192.168.140.10:9092
    log.dirs=/usr/local/kafka33/log
    
    num.network.threads=8
    num.io.threads=16
    
    zookeeper.connect=192.168.140.10:2181,192.168.140.11:2181,192.168.140.12:2181
    

    另外两台消息服务器配置参考上述,注意修改broker id、监听IP

    6、启动kafka

    [root@node01 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties 
    [root@node01 bin]# 
    [root@node01 bin]# netstat -tunlp | grep 9092
    tcp6       0      0 192.168.140.10:9092     :::*                    LISTEN      12309/java          
    [root@node01 bin]# 
    [root@node01 bin]# 
    

    在zookeeper中查看kafka注册的数据

    [root@node01 bin]#  /usr/local/zookeeper/bin/zkCli.sh
    
    [zk: localhost:2181(CONNECTED) 0] ls /brokers 
    [ids, seqid, topics]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers/ids 
    [0, 1, 2]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids/0
    []
    [zk: localhost:2181(CONNECTED) 3] 
    [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.10:9092"],"jmx_port":-1,"features":{},"host":"192.168.140.10","timestamp":"1718782930115","port":9092,"version":5}
    
    

    7、测试生产者、消费者模型

    7.1 创建主题

    [root@node01 ~]# kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --bootstrap-server 192.168.140.10:9092
    Created topic test.
    [root@node01 ~]# 
    [root@node01 ~]# kafka-topics.sh --list --bootstrap-server 192.168.140.10:9092
    test
    [root@node01 ~]# 
    

    7.2 测试生产者产生数据

    [root@node01 ~]# kafka-console-producer.sh --broker-list 192.168.140.10:9092 --topic test
    >nginx
    >httpd
    >php
    >mysql
    >redis
    

    7.3 测试消费者接收数据

    [root@node01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.140.10:9092 --topic test --from-beginning
    nginx
    httpd
    php
    mysql
    redis
    
    
  • 相关阅读:
    CentOS7中安装MYSQL5.7数据库
    批量修改文件格式(python代码+exe文件)。
    分布式数据库Cassandra
    git rebase 合并 commit 保持分支干净整洁
    防御DDoS袭击的高防服务器该如何选择?来看这7点
    Python深度学习:融合网络 | LSTM网络和ResNet网络融合 | 含随机生成的训练数据集
    关于小编入坑第512天
    并行与分布式计算 第三章 进程级的并行:MPI编程
    uniapp微信小程序用户隐私保护
    【BUG】vue中@change时间传值丢失问题
  • 原文地址:https://blog.csdn.net/u010198709/article/details/139772826