• kafka入门03——简单实战


    目录

    安装Java

    安装Zookeeper

    安装Kafka

    生产与消费



    主要是记录下Kafka的安装配置过程,前置条件需要安装jdk和zookeeper

    安装Java

    1.Oracle官网下载对应jdk安装包

    2.将本地压缩包上传到虚拟机自定义路径,路径看诸君的习惯,敝人使的/usr/local/java

    使用SSH远程连接工具FinalShell上传jdk压缩包(上传文件也看诸君喜好的SSH连接工具),FinalShell安装下载:【安装教程】SSH远程连接工具-FinalShell的安装_finalshell安装_Summer_may的博客-CSDN博客

    3.解压缩,在压缩包路径下输入: tar -zxvf 上传的jdk压缩包名

    tar -zvxf jdk-8u391-linux-x64.tar.gz

    4.编辑环境变量,打开配置文件 vim /etc/profile 或者 vi /etc/profile。在文件最后添加:

    export JAVA_HOME=/usr/local/java/jdk1.8.0_391
     
    export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
     
    export PATH=$PATH:$JAVA_HOME/bin
    • JAVA_HOME配置成自己的路径

    • 按i进行编辑,完成后按esc退出编辑模式,“shift+:”输入“wq!”保存并退出

    5.刷新全局配置使生效

    cd / #退回到根目录
     
    . /etc/profile #环境变量配置刷新

    最后检查java版本

    java -version

    安装Zookeeper

    1.java确认安装过了,这里直接开始安装zookeeper。

    多一句,自zk3.5.5版本以后,已编译的jar包,尾部有bin,应该使用的是apache-zookeeper-3.8.3-bin.tar.gz。避免报错:“找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain”。(图片来自网络引用)

    官网地址:Apache Download Mirrors

    2.上传压缩包并解压,参照上文中FinalShell的方式。

    cd /usr/local/zookeeper/
    tar -zvxf apache-zookeeper-3.8.3-bin.tar.gz 

    3.重命名zoo_sample.cfg为zoo.cfg

    # cp conf/zoo_sample.cfg conf/zoo.cfg

    zookeeper的server启动脚本使用的配置文件名称是zoo.cfg,不改名会报错

    4.进入bin目录,使用zkServer.sh启动zookeeper,如果报错“-bash: zkServer.sh: command not found”就使用“./zkServer.sh start”

    # zkServer.sh start

    5.查看启动状态

    # zkServer.sh status

    6.使用Cli验证

    # zkCli.sh

    简单操作:

    1.创建节点 create /zkTest myData

    2.查看节点 get /zkTest 或者 get -s /zkTest

    3.删除节点 delete /zkTest

    4.退出cli客户端 quit

    安装Kafka

    1.官网下载压缩包到本地。官网地址:https://kafka.apache.org/downloads

    2.根据上文一样,使用远程连接工具FinalShell上传压缩包进行解压。

    tar -zxvf  kafka_2.12-3.6.0.tgz

    3.确认Kafka相关配置(kafka依赖Zookeeper先启动Zookeeper服务)

    # cd /usr/local/kafka/kafka_2.12-3.5.0/config/
    # vi server.properties 

    4.进入bin目录使用kafka-server-start.sh通过config配置启动Zookeeper服务

    [root@10 bin]# ./kafka-server-start.sh ../config/server.properties 

    5.重新打开一个SSH连接,通过jps命令查看kafka启动状态

    [root@10 kafka_2.12-3.5.0]# jps -l

    生产与消费

    1.进入kafka的bin目录,先来创建个topic验证下

    [root@10 bin]# ./kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-test --replication-factor 3 --partitions 4

    报错:Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option

    原因是:Kafka 版本过高,命令不存在

    修改命令:

    [root@10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092  --create --topic topic-test --replication-factor 3 --partitions 4

    还是报错: ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

    原因是:zookeeper使用的单机部署,只有一个broker。创建topic的命令中分区是4,副本是3,超出了数量限制。

    修改命令:

    [root@10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092  --create --topic topic-test --replication-factor 1 --partitions 4

    创建topic成功

    2.展示topic信息:

    [root@10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092  --describe --topic topic-test

    3.进入kafka的bin目录使用自带的kafka-console-consumer.sh脚本订阅主题topic-test。

    [root@10 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-test

    这个时候topic-test没有存入任何消息,所以脚本还不能消费任何消息。

    4.打开一个新的SSH连接,进入kafka的bin目录使用自带的kafka-console-producer.sh脚本发送消息到主题topic-test。

    [root@10 bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test

    发送“Hello kafka,”"Hello,test"

    consumer连接窗口就能消费消息了。

    到此,简单的kafka实战案例就结束了。

  • 相关阅读:
    基于SpringBoot蜗牛兼职网的设计与实现【附PPT|万字文档(LW)和搭建文档】
    洛谷_P1007 独木桥_思维
    Dify配置https协议
    Android调用浏览器打开指定页面
    SQL29 计算用户的平均次日留存率
    第十五届蓝桥杯物联网试题(国赛)
    推荐一个屏幕上鼠标高亮显示的小工具
    带你走过动态表单的那些坑
    AIGC: 2 语音转换新纪元-Whisper技术在全球客服领域的创新运用
    云原生写进上海 “十四五” | 上海市信息服务业行业协会领导一行调研「DaoCloud 道客」
  • 原文地址:https://blog.csdn.net/Elaine2391/article/details/134021134