• kafka集群部署


    kafka单机安装: kafka单机安装_Lucifer Zhao的博客-CSDN博客

    1、CentOS Kafka 集群环境安装

    1.1、安装 Kafka

    和单机版安装一样,分别 192.168.1.101、192.168.1.102、192.168.1.103 三台机器安装好 Kafka 环境。

    具体安装过程,参考 kafka单机安装_Lucifer Zhao的博客-CSDN博客

    1.2、修改配置文件

    分别修改 192.168.1.101、192.168.1.102、192.168.1.103 这 3 台机器上的配置文件

    1. cd /usr/local/kafka/config
    2. vim server.properties

    修改配置文件中的 broker.id 分别为 101、102、103

    broker.id=101

    listeners 分 别 设 置 为 PLAINTEXT://192.168.1.101:9092 、 PLAINTEXT://192.168.1.102:9092 、 PLAINTEXT://192.168.1.103:9092

    listeners=PLAINTEXT://192.168.1.101:9092

    三台机器的 zookeeper.connect 都修改为以下内容:

    zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181

    1.3、启动 3 个服务

    1.3.1、分别启动 ZK

    [root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &

    如果启动zookeeper时报错nohup

    1. [root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &
    2. [1] 1579
    3. [root@rmq101 kafka_2.13-3.2.0]# nohup: 忽略输入重定向错误到标准输出端

    修改命令,在&前加上 2>&1

    1. [root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup 2>&1 &
    2. [2] 1929

    检查zookeeper是否启动成功:

    ps -ef|grep zookeeper

    1.3.2、再分别启动 kafka

    [root@rmq103 kafka_2.13-3.2.0]# ./bin/kafka-server-start.sh -daemon ./config/server.properties

    如果启动有问题,通过下面命令启动kafka

    [root@rmq102 kafka_2.13-3.2.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties > /dev/null 2>&1

    通过 ps -ef|grep kafka 命令可以查看kafka是否启动成功

    PS:如果遇到 zk node exists 的问题,先把 brokers 节点删掉(临时解决方案)。

    1.4、集群下创建 Topic

    在 bin 目录下, 创建一个名为 lucifer-topic1 的 topic,只有一个副本,一个分区:

    [root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --create --topic lucifer-topic1 --bootstrap-server 192.168.1.101:9092

    查看已经创建的 topic:

    [root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --describe --topic lucifer-topic1

    1.5、集群下启动 Consumer

    在一个新的远程窗口中:

    1. [root@rmq101 ~]# cd /usr/local/kafka_2.13-3.2.0/bin
    2. [root@rmq101 bin]# sh kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 --topic lucifer-topic1 --from-beginning

    1.6、集群下启动 Producer

    打开一个新的窗口,在 kafka 解压目录下bin子目录下:

    [root@rmq103 bin]# sh kafka-console-producer.sh --broker-list 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 --topic lucifer-topic1

    1.7、集群下 Producer 窗口发送消息

    在生产者窗口(第四个命令窗口为生产者)输入 hello world 回车

    2、基于 Canal 和 Kafka 实现数据同步

    2.1 背景介绍

    Canal 的作用:把自己“伪装”成一个 MySQL 的 Slave,不停同步 Master 的 binlog 数据,再把 binlog 数据以 TCP 或者 MQ 的方式(支持 Kafka、RabbitMQ、RocketMQ)发送给需要同步数据的项目

    canal 项目地址: https://github.com/alibaba/canal

    本案例中,我们需要同步的目标数据库是 192.168.1.102 上部署的数据库。 

    2.2 在目标数据库上创建用户和数据库

    注意 145 上的数据库首先要开启 binlog,binlog-format 必须是 ROW

    1. server-id=1
    2. log-bin=/usr/local/mysql/log-bin/mysql-bin
    3. binlog-format=ROW

    用户和数据库创建

    1. -- 创建 canal 专用的用户,用于访问 master 获取 binlog
    2. CREATE USER canal IDENTIFIED BY '123456';
    3. -- 给 canal 用户分配查询和复制的权限
    4. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@'%';
    5. -- 刷新权限
    6. FLUSH PRIVILEGES;
    7. ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
    8. -- 创建测试数据库
    9. CREATE DATABASE `canaltest` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;

    2.3 在192.168.1.101上安装 ZK 和 Kafka

    参考文档:kafka单机安装_Lucifer Zhao的博客-CSDN博客

    2.4 下载安装 Canal

    以安装目录:/usr/local/canal 为例。 用目前稳定版本 1.1.4

    1. cd /usr/local/
    2. mkdir canal
    3. cd canal
    4. wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    5. tar -zxvf canal.deployer-1.1.4.tar.gz

    如果下载慢的话,最好先下载到本地,再上传到linux服务器

    需要修改的配置项: conf/canal.properties

    1. canal.serverMode=kafka
    2. canal.mq.servers = 192.168.1.101:9092

    example/instance.properties

    1. canal.instance.master.address=192.168.1.102:3306
    2. canal.instance.dbUsername=canal
    3. canal.instance.dbPassword=123456
    4. # 新增
    5. canal.instance.defaultDatabaseName=canaltest
    6. # 这个 topic 会自动创建
    7. canal.mq.topic=canal-topic

    在 bin 目录下启动 canal

    1. sh startup.sh
    2. # 查看实例日志
    3. tail -100f /usr/local/canal/logs/canal/canal.log

    2.5 建表测试

    在 canaltest 数据库随便建一张表,做增删改的操作。 在 kafka 服务器上消费这个 topic

    ./kafka-console-consumer.sh --bootstrap-server 192.168.8.147:9092 --topic canal-topic

    可以成功消费到 canal 发送的消息

  • 相关阅读:
    对Excel表中归类的文件夹进行自动分类
    《白皮书》:人脸识别系统的组成及面临的安全风险
    C语言中文网 - Shell脚本 - 8
    C# SolidWorks二次开发---工程图简单版标注长宽
    cmip6数据处理、统计降尺度、动力降尺度、及应用时的各种问题
    【毕业设计】深度学习身份证识别系统 - 机器视觉 python
    Node.js -- fs模块
    KubeSphere介绍和基于K8S的安装
    【Matplotlib】学术论文黑白柱状图绘制
    Kotlin中reified 关键字
  • 原文地址:https://blog.csdn.net/luciferlongxu/article/details/126190849