• Kafka:C++ 实践


    1、kafka 集群搭建

    设置 kafka 集群下有三台主机:192.168.88.131,192.168.88.132

    1.1、kafka 安装配置

    安装 jdk8

    # 安装 jdk8
    tar -zxvf jdk-8u291-linux-x64.tar.gz
    # 将解压后的文件移动到 /usr/lib 目录下
    mkdir /usr/lib/jdk
    mv jdk1.8.0_291 /usr/lib/jdk/
    
    # 配置 java 环境变量
    # /etc/profile,为所有用户配置 jdk 环境
    vim /etc/profile
    # 添加内容
    export JAVA_HOME=/usr/lib/jdk/jdk1.8.0_291
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
    export PATH=${JAVA_HOME}/bin:$PATH
    
    # 配置生效
    source /etc/profile
    # 测试
    java -version
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    安装 kafka

    # 下载 kafka
    wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
    # 安装 kafka
    tar -zxvf kafka_2.11-2.0.0.tgz
    
    • 1
    • 2
    • 3
    • 4

    1.2、zookeeper 配置

    kafka2.0 版本自带了 zookeeper,3.0 以上需要自行安装 zookeeper。

    • bin 目录:zookeeper-server-start.sh,zookeeper-server-stop.sh
    • config 目录:zookeeper.properties

    修改所有节点的 server.properties

    # 修改 broker.id,四台机器分别配置 0 1 2 3,不能重复,-1自动分配
    broker.id=0
    # 配置对应的 zookeeper 地址,可以配置多个 ip1:port1,ip2:port2 
    zookeeper.connect=192.168.88.131:2181
    # 修改日志路径(tmp目录下次重启丢失),商业用自定义路径
    log.dirs=/tmp/kafka-logs
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    启动 zookeeper,因为我只设置一个,所以只启动节点 101.7.141.229

    cd kafka_2.11-2.0.0/bin
    # 后台运行
    sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
    # 测试
    lsof -i:2181
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3、kafka 安装配置

    启动所有节点的 kafka

    # 前台启动查看报错信息
    # 后台启动
    sh kafka-server-start.sh -daemon ../config/server.properties
    # 后台停止
    sh kafka-server-stop.sh -daemon ../config/server.properties
    # 测试
    lsof -i:9092
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.4、测试 Kafka 集群

    创建主题

    # 创建主题
    sh kafka-topics.sh --create --zookeeper 192.168.88.131:2181  -replication-factor 2 --partitions 2 --topic kafka-2
    
    # 查看主题
    sh kafka-topics.sh --describe --zookeeper 192.168.88.131:2181 --topic kafka-2
    
    # 删除主题
    sh kafka-topics.sh --zookeeper 192.168.88.131:2181 --delete --topic kafka-2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    测试集群:开启一个生产者,两个消费者(同属一个消费者组),消费者轮流收到生产者发送的数据。

    # 生产者
    sh kafka-console-producer.sh --broker-list 192.168.88.131:9092 --topic kafka-2
    
    # 消费者
    sh kafka-console-consumer.sh --bootstrap-server 192.168.88.131:9092 --topic kafka-2 --group 0 --from-beginning
    sh kafka-console-consumer.sh --bootstrap-server 192.168.88.132:9092 --topic kafka-2 --group 0 --from-beginning
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    报错问题:Can’t resolve address

    # 修改 hosts
    vim /etc/hosts
    # 添加其他节点的 host:ip 地址映射
    127.0.1.1       Primrose # 本机
    192.168.88.131  Olberic  # 其他节点
    
    # 查看主机名
    hostname
    # 永久修改主机名
    vi /etc/hostname
    vi /etc/hosts
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1.5、安装 librdkafka

    librdkafka

    git clone https://github.com/edenhill/librdkafka.git
    cd librdkafka
    git checkout v1.7.0
    ./configure
    make
    sudo make install
    sudo ldconfig
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2、生产者

    2.1、生产逻辑

    • 配置生产者客户端参数并创建对应的生产者实例
    • 构建待发送的消息
    • 发送消息:librdkafka 只提供的异步的生产接口
    • 关闭生产者实例

    必要参数

    • bootstrap.servers:指定连接 Kafka 集群所需要的 broker 地址列表。并不需要设置所有的 broker 地址,因为生产者会从给定的 broker 里查找其他 broker 的信息,建议设置两个以上的 broker 地址信息,当任意一个宕机,生产者仍然可以连接到 kafka 集群。

    2.2、代码实现

    KafkaProducer.h

    #ifndef KAFKAPRODUCER_H
    #define KAFKAPRODUCER_H
    
    #pragma once
    
    #include 
    #include 
    #include 
    
    // 生产者投递报告回调
    class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
      public:
        void dr_cb(RdKafka::Message &message) {
            // 发送出错的回调
            if (message.err()) {
                std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
            } 
            // 发送正常的回调
            // Message delivered to topic test [2] at offset 4169
            else {  
                std::cout << "Message delivered to topic " << message.topic_name()
                          << " [" << message.partition() << "] at offset "
                          << message.offset() << std::endl;  
            }
        }
    };
    
    // 生产者事件回调函数
    class ProducerEventCb : public RdKafka::EventCb {
      public:
        void event_cb(RdKafka::Event &event) {
            switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                std::cout << "RdKafka::Event::EVENT_ERROR: "
                          << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::Event::EVENT_STATS: 
                std::cout << "RdKafka::Event::EVENT_STATS: " << event.str()
                          << std::endl;
                break;
            case RdKafka::Event::EVENT_LOG: 
                std::cout << "RdKafka::Event::EVENT_LOG " << event.fac()
                          << std::endl;
                break;
            case RdKafka::Event::EVENT_THROTTLE:
                std::cout << "RdKafka::Event::EVENT_THROTTLE "
                          << event.broker_name() << std::endl;
                break;
            }
        }
    };
    
    // 生产者自定义分区策略回调:partitioner_cb
    class HashPartitionerCb : public RdKafka::PartitionerCb {
      public:
        // @brief 返回 topic 中使用 key 的分区,msg_opaque 置 NULL
        // @return 返回分区,(0, partition_cnt)
        int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
                               int32_t partition_cnt, void *msg_opaque) {
            char msg[128] = {0};
            // 用于自定义分区策略:这里用 hash。例:轮询方式:p_id++ % partition_cnt
            int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
            // 输出:[topic][key][partition_cnt][partition_id],例 [test][6419][2][1]
            sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s], partition_cnt:[%d], partition_id:[%d]",
                    topic->name().c_str(), key->c_str(), partition_cnt, partition_id);
            std::cout << msg << std::endl;
            return partition_id;
        }
    
      private:
        // 自定义哈希函数 
        static inline unsigned int generate_hash(const char *str, size_t len) {
            unsigned int hash = 5381;
            for (size_t i = 0; i < len; i++)
                hash = ((hash << 5) + hash) + str[i];
            return hash;
        }
    };
    
    class KafkaProducer {
      public:
        /**
         * @brief KafkaProducer
         * @param brokers
         * @param topic
         * @param partition
         */
        explicit KafkaProducer(const std::string &brokers, const std::string &topic,
                               int partition);
        /**
         * @brief push Message to Kafka
         * @param str, message data
         */
        void pushMessage(const std::string &str, const std::string &key);
        ~KafkaProducer();
    
      protected:
        std::string m_brokers;          // Broker 列表,多个使用逗号分隔
        std::string m_topicStr;         // Topic 名称
        int m_partition;                // 分区
        RdKafka::Conf *m_config;        // Kafka Conf 对象
        RdKafka::Conf *m_topicConfig;   // Topic Conf 对象
    
        RdKafka::Topic *m_topic;              // Topic对象
        RdKafka::Producer *m_producer;        // Producer对象
        RdKafka::DeliveryReportCb *m_dr_cb;   // 设置传递回调
        RdKafka::EventCb *m_event_cb;         // 设置事件回调
        RdKafka::PartitionerCb *m_partitioner_cb; // 设置自定义分区回调
    };
    
    #endif // KAFKAPRODUCER_H
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    KafkaProducer.cpp

    #include "KafkaProducer.h"
    
    // 构造生产者
    KafkaProducer::KafkaProducer(const std::string &brokers, const std::string &topic, int partition) {
        m_brokers = brokers;
        m_topicStr = topic;
        m_partition = partition;
    
        RdKafka::Conf::ConfResult errCode;      // 创建错误码
        std::string errorStr;                   // 返回错误信息   
    
        // 创建配置对象
        // 1.1、创建 Kafka Conf 对象
        m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        if (m_config == NULL) {
            std::cout << "Create RdKafka Conf failed." << std::endl;
        }
    
        // 设置 Broker 属性       
        // (必要参数)指定 broker 地址列表。格式:host1:port1,host2:port2,...
        errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
        if (errCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed:" << errorStr << std::endl;
        }
    
        // 设置生产者投递报告回调
        m_dr_cb = new ProducerDeliveryReportCb; // 创建投递报告回调
        errCode = m_config->set("dr_cb", m_dr_cb, errorStr);    // 异步方式发送数据
        if (errCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed:" << errorStr << std::endl;
        }
    
        // 设置生产者事件回调
        m_event_cb = new ProducerEventCb; // 创建生产者事件回调
        errCode = m_config->set("event_cb", m_event_cb, errorStr);
        if (errCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed:" << errorStr << std::endl;
        }
    
        // 设置数据统计间隔
        errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
        if (errCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed:" << errorStr << std::endl;
        }
    
        // 设置最大发送消息大小
        errCode = m_config->set("message.max.bytes", "10240000", errorStr);
        if (errCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed:" << errorStr << std::endl;
        }
    
        // 2、创建 Topic Conf 对象
        m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
        if (m_topicConfig == NULL) {
            std::cout << "Create RdKafka Topic Conf failed." << std::endl;
        }
    
        // 设置生产者自定义分区策略回调
        m_partitioner_cb = new HashPartitionerCb; // 创建自定义分区投递回调
        errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
        if (errCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed:" << errorStr << std::endl;
        }
        
        // 2、创建对象
        // 2.1、创建 Producer 对象,可以发布不同的主题
        m_producer = RdKafka::Producer::create(m_config, errorStr);
        if (m_producer == NULL) {
            std::cout << "Create Producer failed:" << errorStr << std::endl;
        }
    
        // 2.2、创建 Topic 对象,可以创建多个不同的 topic 对象
        m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
        // m_topic2 =RdKafka::Topic::create(m_producer, m_topicStr2, m_topicConfig2, errorStr);
        if (m_topic == NULL) {
            std::cout << "Create Topic failed:" << errorStr << std::endl;
        }
    }
    
    // 发送消息
    void KafkaProducer::pushMessage(const std::string &str,const std::string &key) {
        int32_t len = str.length();
        void *payload = const_cast<void *>(static_cast<const void *>(str.data()));
        
        // produce 方法,生产和发送单条消息到 Broker
        // 如果不加时间戳,内部会自动加上当前的时间戳
        RdKafka::ErrorCode errorCode = m_producer->produce(
            m_topic,                      // 指定发送到的主题
            RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
                                          // partitioner_cb的回调选择合适的分区
            RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
            payload,                        // 消息本身
            len,                            // 消息长度
            &key,                           // 消息key
            NULL // an optional application-provided per-message opaque pointer
                 // that will be provided in the message delivery callback to let
                 // the application reference a specific message
        );
        // 轮询处理
        m_producer->poll(0);
        if (errorCode != RdKafka::ERR_NO_ERROR) {
            std::cerr << "Produce failed: " << RdKafka::err2str(errorCode)
                      << std::endl;
            // kafka 队列满,等待 100 ms
            if (errorCode == RdKafka::ERR__QUEUE_FULL) {
                m_producer->poll(100);
            }
        }
    }
    
    // 析构生产者
    KafkaProducer::~KafkaProducer() {
        while (m_producer->outq_len() > 0) {
            std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
            m_producer->flush(5000);
        }
        delete m_config;
        delete m_topicConfig;
        delete m_topic;
        delete m_producer;
        delete m_dr_cb;
        delete m_event_cb;
        delete m_partitioner_cb;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    3、消费者

    3.1、消费逻辑

    • 配置消费者客户端参数和创建相应的消费者实例
    • 订阅主题和分区
    • 拉取消息并消费
    • 位移提交
    • 关闭消费者实例

    必要参数

    • bootstrap.servers:指定 broker 地址列表

    • group.id:指定消费者组 id

    • auto.offset.reset:偏移量,新来消费者的消费起始位置。旧消费者从 offset 处开始消费

      • earliest:从头开始消费
      • latest:从最新的数据开始消费

    3.2、位移提交

    消费者需要向 kafka 提交位移,用来表示消费者的消费进度,这样当消费者宕机重启后,就能从读取之前的位移处继续消费,从而避免整个消费过程再来一遍。由于消费者能够同时消费多个分区,所以位移提交实际上是在分区粒度上进行的,消费者需要为分配给它的每个分区提交各自的位移数据。

    从用户的角度来说,位移提交分为自动提交和手动提交。从消费者端的角度来说,位移提交分为同步提交和异步提交。

    3.2.1、自动提交

    自动提交默认全部为同步提交。当开启自动提交后,kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此能保证不出现消费丢失的情况。

    • enable.auto.commit (bool):默认 true,自动定时提交消费者 offset。
    • auto.commit.interval.ms(int): 自动提交 offset 的间隔毫秒数。默认值为:5000。

    但是,自动提交存在一个问题,可能会出现重复消费。

    3.2.2、手动提交

    手动提交分为同步提交和异步提交两种方式,同步提交阻塞重试,异步提交非阻塞不重试(重试时提交的位移值早已过期)。同时结合两种方式,可以实现异步无阻塞式的位移管理,也确保了 消费者位移的正确性

    • 对于阶段性的手动提交,调用 commitAsync 避免程序阻塞
    • 在消费者关闭前,调用 commitSync 执行同步阻塞的位移提交,保证消费者关闭前能够保存正确的位移数据
    while (true) {
        RdKafka::Message *msg = m_consumer->consume(1000); 
        // 消费消息
        msg_consume(msg, NULL);
        
        // 开启手动提交
        // 1、异步提交
        m_consumer->commitAsync(); 
        delete msg;
    }
    // 2、同步提交,Consumer 要关闭前调用
    m_consumer->commitSync(); 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3.2.3、reblance
    • 手动提交,当集群满足 reblance 的条件时,集群会直接 reblance,不会等待所有消息被消费完,这会导致所有未被确认的消息会重新被消费,会出现重复消费的问题
    • 自动提交,当集群满足 reblance 的条件时,集群不会马上 reblance,而是会等待所有消费者消费完当前消息,或者等待消费者超时,然后 reblance。

    3.3、Rebalance 机制

    当 kafka 遇到下面四种情况,消费者组触发 rebalance 机制

    • 消费组成员发生了变更(宕机、加入)
    • 消费者无法在指定的时间之内完成消息的消费
    • 订阅的主题发生了变化
    • 订阅的主题的分区发生了变化

    后两者都是运维的主动操作,其引发的 rebalance 不可避免。但是前两者 rebalance 的原因是消费者心跳超时和消费者消费数据超时。因此,可以通过适当调参,一定程度减少 rebalance。

    对于消费者组成员变更(消费者心跳超时),要保证消费者实例在被判定为 dead 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms

    session.timeout.ms = 6s 		// consumer 向 broker 发送心跳的超时时间
    heartbeat.interval.ms = 2s 	     // consumer 每次向 broker 发送心跳的时间间隔。
    
    • 1
    • 2

    对于消费者消费超时,一般是增加消费者处理的时间,减少每次处理的消息数,阿里云官方文档建议 max.poll.records 参数要远小于当前消费组的消费能力

    max.poll.records < 单个线程每秒消费的条数 * 消费线程的个数 * session.timeout的秒数

    max.poll.interval.ms  // 消费者每两次 poll 消息的时间间隔,也就是消费者处理的时间
    max.poll.records	  // 消费者每次处理的消息数
    
    • 1
    • 2

    3.2、代码实现

    KafkaConsumer.h

    #ifndef KAFKACONSUMER_H
    #define KAFKACONSUMER_H
    
    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    #include 
    
    // 设置事件回调
    class ConsumerEventCb : public RdKafka::EventCb {
      public:
        void event_cb(RdKafka::Event &event) {
            switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal()) {
                    std::cerr << "FATAL ";
                }
                std::cerr << "ERROR (" << RdKafka::err2str(event.err())
                          << "): " << event.str() << std::endl;
                break;
    
            case RdKafka::Event::EVENT_STATS:
                std::cerr << "\"STATS\": " << event.str() << std::endl;
                break;
    
            case RdKafka::Event::EVENT_LOG:
                fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(),
                        event.fac().c_str(), event.str().c_str());
                break;
    
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "THROTTLED: " << event.throttle_time() << "ms by "
                          << event.broker_name() << " id " << (int)event.broker_id()
                          << std::endl;
                break;
    
            default:
                std::cerr << "EVENT " << event.type() << " ("
                          << RdKafka::err2str(event.err()) << "): " << event.str()
                          << std::endl;
                break;
            }
        }
    };
    
    // 设置消费者组再平衡回调
    // 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
    class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
      private:
        // 打印当前获取的分区
        static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>&partitions) 
        {
            for (unsigned int i = 0; i < partitions.size(); i++) {
                std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], ";
            }
            std::cerr << "\n";
        }
    
      public:
        // 消费者组再平衡回调
        void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
                          std::vector<RdKafka::TopicPartition *> &partitions) {
            std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
            printTopicPartition(partitions);
            
            // 分区分配成功
            if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
                // 消费者订阅这些分区
                consumer->assign(partitions);
                // 获取消费者组本次订阅的分区数量,可以属于不同的topic
                partition_count = (int)partitions.size();
            } 
            // 分区分配失败
            else {
                // 消费者取消订阅所有的分区
                consumer->unassign();
                // 消费者订阅分区的数量为0
                partition_count = 0;
            }
        }
    
      private:
        int partition_count;    // 消费者组本次订阅的分区数量
    };
    
    class KafkaConsumer {
      public: /**
               * @brief KafkaConsumer
               * @param brokers
               * @param groupID
               * @param topics
               * @param partition
               */
        explicit KafkaConsumer(const std::string &brokers,
                               const std::string &groupID,
                               const std::vector<std::string> &topics,
                               int partition);
        void pullMessage();
        ~KafkaConsumer();
    
      protected:
        std::string m_brokers;
        std::string m_groupID;
        std::vector<std::string> m_topicVector;
        int m_partition;
        RdKafka::Conf *m_config;
        RdKafka::Conf *m_topicConfig;
        RdKafka::KafkaConsumer *m_consumer;
        RdKafka::EventCb *m_event_cb;
        RdKafka::RebalanceCb *m_rebalance_cb;
    };
    
    #endif // KAFKACONSUMER_H
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116

    KafkaConsumer.cpp

    #include "KafkaConsumer.h"
    
    // 构造消费者
    KafkaConsumer::KafkaConsumer(const std::string &brokers,
                                 const std::string &groupID,
                                 const std::vector<std::string> &topics,
                                 int partition) {
        m_brokers = brokers;
        m_groupID = groupID;
        m_topicVector = topics;
        m_partition = partition;
    
        std::string errorStr;
        RdKafka::Conf::ConfResult errorCode;
        
        // 1、创建配置对象
        // 1.1、构造 consumer conf 对象
        m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    
        // 必要参数1:指定 broker 地址列表
        errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
        // 必要参数2:设置消费者组 id
        errorCode = m_config->set("group.id", m_groupID, errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 设置事件回调
        m_event_cb = new ConsumerEventCb;
        errorCode = m_config->set("event_cb", m_event_cb, errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 设置消费者组再平衡回调
        m_rebalance_cb = new ConsumerRebalanceCb;
        errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
        errorCode = m_config->set("enable.partition.eof", "false", errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 每次最大拉取的数据大小
        errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 设置分区分配策略:range、roundrobin、cooperative-sticky
        errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 心跳探活超时时间
        errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 心跳保活间隔
        errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 1.2、创建 topic conf 对象
        m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
        
        // 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
        errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Topic Conf set failed: " << errorStr << std::endl;
        }
    
        // 默认 topic 配置,用于自动订阅 topics
        errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
        if (errorCode != RdKafka::Conf::CONF_OK) {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // 2、创建 Consumer 对象
        m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
        if (m_consumer == NULL) {
            std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
        }
        std::cout << "Created consumer " << m_consumer->name() << std::endl;
    }
    
    // 消费消息
    void msg_consume(RdKafka::Message *msg, void *opaque) {
        switch (msg->err()) {
        case RdKafka::ERR__TIMED_OUT:
            // std::cerr << "Consumer error: " << msg->errstr() << std::endl; //
            // 超时
            break;
        case RdKafka::ERR_NO_ERROR: // 有消息进来
            std::cout << " Message in-> topic:" << msg->topic_name()
                      << ", partition:[" << msg->partition() << "] at offset "
                      << msg->offset() << " key: " << msg->key()
                      << " payload: " << (char *)msg->payload() << std::endl;
            break;
        default:
            std::cerr << "Consumer error: " << msg->errstr() << std::endl;
            break;
        }
    }
    
    // 拉取消息并消费
    void KafkaConsumer::pullMessage() {
        // 1、订阅主题
        RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
        if (errorCode != RdKafka::ERR_NO_ERROR) {
            std::cout << "subscribe failed: " << RdKafka::err2str(errorCode)
                      << std::endl;
        }
        // 2、拉取并消费消息
        while (true) {
            RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
            // 消费消息
            msg_consume(msg, NULL);
            
            // 开启手动提交
            // 1、异步提交,阶段性提交
            // m_consumer->commitAsync(); 
            delete msg;
        }
        // 2、同步提交,Consumer 关闭前调用,等待 broker 返回读取消息
        m_consumer->commitSync(); 
    }
    
    KafkaConsumer::~KafkaConsumer() {
        m_consumer->close();
        delete m_config;
        delete m_topicConfig;
        delete m_consumer;
        delete m_event_cb;
        delete m_rebalance_cb;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

    4、测试结果

    先启动消费者1,分区分配情况如下所示

    RebalanceCb: Local: Assign partitions: kafka-2[0], kafka-2[1], 
    
    • 1

    当启动消费者2,消费者组成员变更,触发 rebalance 机制

    // 消费者1
    RebalanceCb: Local: Revoke partitions: kafka-2[0], kafka-2[1], 
    RebalanceCb: Local: Assign partitions: kafka-2[1], 
    
    // 消费者2
    RebalanceCb: Local: Assign partitions: kafka-2[0], 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    开启生产者,key-ordering 方式投递 1000 个元素,消费者消费状态如下所示

    // 消费者1
     Message in-> topic:kafka-2, partition:[1] at offset 2048 key: 0x1da35d0 payload: Hello RdKafka sh kafka-topics.sh msg  996
     Message in-> topic:kafka-2, partition:[1] at offset 2049 key: 0x1da35d0 payload: Hello RdKafka sh kafka-topics.sh msg  998
    
    // 消费者2
     Message in-> topic:kafka-2, partition:[0] at offset 2048 key: 0x1330090 payload: Hello RdKafka sh kafka-topics.sh msg  997
     Message in-> topic:kafka-2, partition:[0] at offset 2049 key: 0x1330090 payload: Hello RdKafka sh kafka-topics.sh msg  999
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5、参考

  • 相关阅读:
    运筹学基础【四】 之 库存管理
    高手必备!电脑剪辑视频的实用方法
    TensorFlow搭建LSTM实现多变量时间序列预测(负荷预测)
    Python划分训练集与测试集KFold交叉验证
    LDA(Fisher)线性判别分析
    Android 编译插桩操纵字节码
    软件项目管理实践指南:有效规划、执行和控制
    C++初阶-类和对象(中)1
    x86和arm框架下的centOS
    【复杂网络】网络科学导论学习笔记-第五章节点重要性与相似性
  • 原文地址:https://blog.csdn.net/you_fathe/article/details/128192430