一、集群准备
node1 192.168.157.128
node1 192.168.157.129
node1 192.168.157.130
二、安装zookeeper集群
三、下载kafka
四、安装kafka
1. 将kafka安装包上传到服务器的/opt/apps目录下
2. 解压
tar -zxvf kafka_2.12-3.2.3.tgz
3. 创建kafka日志目录
- cd kafka_2.12-3.2.3
- mkdir logs
4. 修改kafka配置
编辑config目录下的server.properties文件,在不同的节点只需要更改不同的host.name和broker.id
- # 每个broker在集群中的唯一标识,不能重复
- broker.id=0
- # 端口
- port=9092
- # broker主机地址或者hostname
- host.name=node1
- # broker处理消息的线程数
- num.network.threads=3
- # broker处理磁盘io的线程数
- num.io.threads=8
- # socket发送数据缓冲区
- socket.send.buffer.bytes=102400
- # socket接收数据缓冲区
- socket.receive.buffer.bytes=102400
- # socket接收请求最大值
- socket.request.max.bytes=104857600
- # kafka数据存放目录位置,多个位置用逗号隔开
- log.dirs=/opt/apps/kafka_2.12-3.2.3/logs
- # topic默认的分区数
- num.partitions=1
- # 恢复线程数
- num.recovery.threads.per.data.dir=1
- # 默认副本数
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- # 消息日志最大存储时间,这里是7天
- log.retention.hours=168
- # 每个日志分段文件大小,这里是1g
- log.segment.bytes=1073741824
- # 消息日志文件大小检查间隔时间
- log.retention.check.interval.ms=300000
- # zookeeper集群地址
- zookeeper.connect=192.168.157.128:2181,192.168.157.129:2181,192.168.157.130:2181
- # zookeeper连接超时时间
- zookeeper.connection.timeout.ms=6000
- # 推迟初始消费者再平衡时间。
- group.initial.rebalance.delay.ms=0
5. 启动kafka(先启动zookeeper集群)
- #到bin目录下执行
- #后台启动加参数-daemon
- ./kafka-server-start.sh -daemon ../config/server.properties
6. 测试生产和消费
- #生产消息
- ./kafka-console-producer.sh --broker-list 192.168.157.128:9092,192.168.157.129:9092,192.168.157.130:9092 --topic test-topic
- #消费消息
- ./kafka-console-consumer.sh --bootstrap-server 192.168.157.128:9092,192.168.157.129:9092,192.168.157.130:9092 --topic test-topic


到此kafka集群安装完毕。