• Kafka 单机和集群环境部署教程


    下面是 Apache Kafka 单机和集群环境部署的详细教程,包括部署过程中的注意事项以及一个使用案例。Apache Kafka 是一个分布式流处理平台,广泛用于实时数据处理、日志收集、消息队列等场景。


    一、Kafka 单机环境部署

    1. 环境准备

    • 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
    • Java:Kafka 需要 Java 环境,推荐使用 OpenJDK 8 或 11。
    • ZooKeeper:Kafka 依赖 ZooKeeper 进行分布式协调。

    2. 安装 Java

    在 Ubuntu 中:

    sudo apt update
    sudo apt install openjdk-11-jdk
    

    在 CentOS 中:

    sudo yum install java-11-openjdk
    

    验证 Java 安装:

    java -version
    

    3. 安装 ZooKeeper

    Kafka 使用 ZooKeeper 进行节点管理和协调,需要先安装并启动 ZooKeeper。

    3.1 下载并解压 ZooKeeper
    wget https://downloads.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
    tar -xzvf apache-zookeeper-3.8.2-bin.tar.gz
    mv apache-zookeeper-3.8.2-bin /usr/local/zookeeper
    
    3.2 配置 ZooKeeper
    1. 创建数据目录:

      mkdir -p /var/lib/zookeeper
      
    2. 复制配置文件:

      cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
      
    3. 编辑配置文件 /usr/local/zookeeper/conf/zoo.cfg

      dataDir=/var/lib/zookeeper
      clientPort=2181
      
    3.3 启动 ZooKeeper
    /usr/local/zookeeper/bin/zkServer.sh start
    
    3.4 验证 ZooKeeper 是否正常运行
    /usr/local/zookeeper/bin/zkCli.sh -server localhost:2181
    

    在连接成功后输入 ls /,若返回空列表([]),则说明连接成功。

    4. 安装 Kafka

    4.1 下载并解压 Kafka

    访问 Kafka 官网 下载最新版本的 Kafka。

    wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
    tar -xzvf kafka_2.12-3.5.0.tgz
    mv kafka_2.12-3.5.0 /usr/local/kafka
    
    4.2 配置 Kafka

    编辑 Kafka 的配置文件 /usr/local/kafka/config/server.properties

    # Kafka Broker ID,唯一标识符
    broker.id=0
    
    # 监听的接口和端口
    listeners=PLAINTEXT://:9092
    
    # 日志文件存储路径
    log.dirs=/var/lib/kafka-logs
    
    # Zookeeper 连接地址
    zookeeper.connect=localhost:2181
    
    4.3 创建日志目录
    mkdir -p /var/lib/kafka-logs
    
    4.4 启动 Kafka Broker
    /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    
    4.5 验证 Kafka 是否正常运行

    创建一个测试 Topic:

    /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    

    列出 Topic:

    /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    

    你应该看到 test-topic 在列出的 Topic 中。

    5. Kafka 单机部署的注意事项

    • ZooKeeper:确保 ZooKeeper 正常运行,并且 zookeeper.connect 地址配置正确。
    • 内存和存储:为 Kafka 分配足够的内存和存储空间,尤其是在高负载场景下。
    • 日志文件:定期检查和清理 Kafka 日志文件,以防止磁盘占满。
    • 监听地址:如果需要远程访问,确保 listeners 配置了正确的监听地址。
    • 防火墙设置:确保防火墙开放了 Kafka 和 ZooKeeper 使用的端口(默认 9092 和 2181)。

    二、Kafka 集群环境部署

    Kafka 集群由多个 Kafka Broker 组成,能够提供高可用性和水平扩展。

    1. 环境准备

    • 多台服务器:至少 3 台(3 个 Kafka Broker 和 3 个 ZooKeeper 实例)
    • 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
    • Java:在所有节点上安装 Java

    2. 安装 ZooKeeper 集群

    在每台服务器上按照单机部署的步骤安装 ZooKeeper,并进行以下配置:

    2.1 配置 ZooKeeper 节点 ID

    编辑每个节点的 zoo.cfg 文件,添加如下配置:

    server.1=zookeeper1:2888:3888
    server.2=zookeeper2:2888:3888
    server.3=zookeeper3:2888:3888
    

    在每台服务器上创建 myid 文件,用于标识节点:

    echo "1" > /var/lib/zookeeper/myid  # 在 zookeeper1 上
    echo "2" > /var/lib/zookeeper/myid  # 在 zookeeper2 上
    echo "3" > /var/lib/zookeeper/myid  # 在 zookeeper3 上
    
    2.2 启动 ZooKeeper 集群

    在每台服务器上启动 ZooKeeper:

    /usr/local/zookeeper/bin/zkServer.sh start
    

    3. 安装 Kafka 集群

    在每台服务器上按照单机部署的步骤安装 Kafka,并进行以下配置:

    3.1 配置 Kafka Broker

    编辑每个节点的 server.properties 文件,添加如下配置:

    broker.id=0  # 每个 Broker 唯一 ID
    listeners=PLAINTEXT://:9092
    log.dirs=/var/lib/kafka-logs
    zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
    
    3.2 启动 Kafka Broker

    在每台服务器上启动 Kafka Broker:

    /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    

    4. 验证 Kafka 集群状态

    4.1 创建 Topic

    在任一 Kafka Broker 上执行以下命令:

    /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 3
    
    4.2 验证 Topic

    列出集群中的 Topic:

    /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092
    

    查看 Topic 详细信息:

    /usr/local/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server kafka1:9092
    

    5. Kafka 集群部署的注意事项

    • ZooKeeper 集群:确保每个节点配置了正确的 myid,并且所有节点可以互相通信。
    • Kafka Broker 配置:每个 Broker 必须有唯一的 broker.id
    • 分区和副本:根据实际需求配置合适的分区数和副本数,以提高数据可靠性和吞吐量。
    • 监控和报警:使用 Kafka Manager 或其他监控工具监控集群状态,及时处理故障。
    • 网络配置:确保各节点之间的网络连接正常,并且防火墙开放了必要端口。
    • 资源规划:为 Kafka 和 ZooKeeper 分配足够的 CPU、内存和磁盘资源。

    三、Kafka 使用案例:生产者和消费者

    1. 使用 Java 实现 Kafka 生产者和消费者

    1.1 添加依赖

    在 Maven 项目中添加 Kafka 的依赖:

    <dependency>
        <groupId>org.apache.kafkagroupId>
        <artifactId>kafka-clientsartifactId>
        <version>3.5.0version>
    dependency>
    
    1.2 编写 Kafka 生产者
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class SimpleProducer {
    
        public static void main(String[] args) {
    
            // Kafka 生产者配置
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig
    
    .KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // 创建生产者
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            // 发送消息
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "Message " + i);
                producer.send(record);
            }
    
            // 关闭生产者
            producer.close();
        }
    }
    
    1.3 编写 Kafka 消费者
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class SimpleConsumer {
    
        public static void main(String[] args) {
    
            // Kafka 消费者配置
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            // 创建消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            // 订阅主题
            consumer.subscribe(Collections.singletonList("test-topic"));
    
            // 轮询消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }
    
    1.4 运行 Java 程序

    编译并运行生产者:

    mvn compile
    mvn exec:java -Dexec.mainClass="SimpleProducer"
    

    编译并运行消费者:

    mvn exec:java -Dexec.mainClass="SimpleConsumer"
    

    2. 使用 Python 实现 Kafka 生产者和消费者

    2.1 安装 Kafka 库
    pip install kafka-python
    
    2.2 编写 Kafka 生产者
    from kafka import KafkaProducer
    
    # 创建 Kafka 生产者
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    
    # 发送消息
    for i in range(10):
        producer.send('test-topic', key=bytes(str(i), encoding='utf-8'), value=bytes(f'Message {i}', encoding='utf-8'))
    
    # 关闭生产者
    producer.close()
    
    2.3 编写 Kafka 消费者
    from kafka import KafkaConsumer
    
    # 创建 Kafka 消费者
    consumer = KafkaConsumer(
        'test-topic',
        bootstrap_servers='localhost:9092',
        group_id='test-group',
        auto_offset_reset='earliest'
    )
    
    # 轮询消息
    for message in consumer:
        print(f'Offset = {message.offset}, Key = {message.key.decode()}, Value = {message.value.decode()}')
    
    2.4 运行 Python 程序

    运行生产者:

    python kafka_producer.py
    

    运行消费者:

    python kafka_consumer.py
    

    3. 注意事项

    • 生产者和消费者配置:合理配置 bootstrap.serverskey.serializervalue.serializergroup.id 等参数。
    • 分区策略:在生产者中使用自定义分区策略,可以提高吞吐量和负载均衡。
    • 消费组:多个消费者实例可以组成一个消费组,以提高处理能力。
    • 容错机制:在实际应用中,需要考虑重试、错误处理和幂等性等问题。

    总结

    通过以上步骤,我们成功部署了 Kafka 单机和集群环境,并实现了一个简单的生产者和消费者应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,适合用于实时流处理和数据管道。

    部署过程中的注意事项

    • Java 版本:确保安装了正确版本的 Java。
    • ZooKeeper 集群:确保 ZooKeeper 集群稳定运行,并配置正确。
    • 网络配置:各节点之间的网络连接需要稳定,端口要开放。
    • 资源配置:根据业务需求配置合适的内存、CPU 和磁盘资源。
    • 数据安全:启用 Kafka 的 SSL/TLS 和 SASL 认证机制,确保数据安全传输。
    • 监控和管理:使用 Kafka Manager、Prometheus 等工具监控集群状态,及时处理异常。
    • 日志管理:定期检查和清理 Kafka 的日志,以防止磁盘空间不足。

    通过合理的配置和优化,Kafka 可以为应用程序提供可靠的消息传递和流处理服务,是构建实时数据管道和事件驱动架构的重要组件。

  • 相关阅读:
    python,迪卡尔象限中画点
    超详细Python教程——修改和增加类属性
    解答嵌入式和单片机的关系
    Python 树表查找_千树万树梨花开,忽如一夜春风来(二叉排序树、平衡二叉树)
    服务端Skynet(一)——源码浅析
    实验6.1 python文件应用
    Spring之bean的生命周期
    Spark Streaming状态管理函数updateStateByKey和mapWithState
    nacos 拉取配置失败
    深入了解Eureka:微服务架构中的服务发现与注册中心
  • 原文地址:https://blog.csdn.net/qq_42568323/article/details/141072050