• C++实现kafka的生产者客户端


    一、Kafka 生产者的逻辑

    配置客户端参数
    创建生产者实例
    构建待发送消息
    发送消息
    关闭实例

    (1)配置生产者客户端参数。
    (2)创建相应的生产者实例。
    (3)构建待发送的消息。
    (4)发送消息。
    (5)关闭生产者实例。

    二、Kafka 的C++ API

    2.1、RdKafka::Conf

    enum ConfType{ 
    	CONF_GLOBAL, 	// 全局配置 
    	CONF_TOPIC 		// Topic配置 
    };
    enum ConfResult{ 
    	CONF_UNKNOWN = -2, 
    	CONF_INVALID = -1, 
    	CONF_OK = 0 
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. static Conf * create(ConfType type);
      创建配置对象。

    2. Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
      设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。

    3. Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
      设置dr_cb属性值。

    4. Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
      设置event_cb属性值。

    5. Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
      设置用于自动订阅Topic的默认Topic配置。

    6. Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
      设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。

    7. Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
      设置partitioner_key_pointer_cb属性值。

    8. Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
      设置socket_cb属性值。

    9. Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
      设置open_cb属性值。

    10. Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
      设置rebalance_cb属性值。

    11. Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
      设置offset_commit_cb属性值。

    12. Conf::ConfResult get(const std::string &name, std::string &value) const;
      查询单条属性配置值。

    2.2、RdKafka::Message

    Message表示一条消费或生产的消息,或是事件。

    1. std::string errstr() const;
      如果消息是一条错误事件,返回错误字符串,否则返回空字符串。
    2. ErrorCode err() const;
      如果消息是一条错误事件,返回错误代码,否则返回0。
    3. Topic * topic() const;
      返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。
    4. std::string topic_name() const;
      返回消息的Topic名称。
    5. int32_t partition() const;
      如果分区可用,返回分区号。
    6. void * payload() const;
      返回消息数据。
    7. size_t len() const;
      返回消息数据的长度。
    8. const std::string * key() const;
      返回字符串类型的消息key。
    9. const void * key_pointer() const;
      返回void类型的消息key。
    10. size_t key_len() const;
      返回消息key的二进制长度。
    11. int64_t offset () const;
      返回消息或错误的位移。
    12. void * msg_opaque() const;
      返回RdKafka::Producer::produce()提供的msg_opaque。
    13. virtual MessageTimestamp timestamp() const = 0;
      返回消息时间戳。
    14. virtual int64_t latency() const = 0;
      返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。
    15. virtual struct rd_kafka_message_s *c_ptr () = 0;
      返回底层数据结构的C rd_kafka_message_t句柄。
    16. virtual Status status () const = 0;
      返回消息在Topic Log的持久化状态。
    17. virtual RdKafka::Headers *headers () = 0;
      返回消息头。
    18. virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
      返回消息头,错误信息会输出到err。

    2.3、RdKafka::DeliveryReportCb

    每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。
    为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。

    virtual void dr_cb(Message &message)=0;
    
    • 1

    当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。

    C++封装示例:

    class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
    {
    public:
    	void dr_cb(RdKafka::Message &message)
    	{
    		if(message.err())
    			std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
    		else
    		{
    			// Message delivered to topic test [0] at offset 135000
    			std::cerr << "Message delivered to topic " << message.topic_name()
    				<< " [" << message.partition() << "] at offset "
    				<< message.offset() << std::endl;
    		}
    	}
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.4、RdKafka::Event

    enum Type{ 
    	EVENT_ERROR, //错误条件事件 
    	EVENT_STATS, // Json文档统计事件 
    	EVENT_LOG, // Log消息事件 
    	EVENT_THROTTLE // 来自Broker的throttle级信号事件 
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. virtual Type type() const =0;
      返回事件类型。
    2. virtual ErrorCode err() const =0;
      返回事件错误代码。
    3. virtual Severity severity() const =0;
      返回log严重级别。
    4. virtual std::string fac() const =0;
      返回log基础字符串。
    5. virtual std::string str () const =0;
      返回Log消息字符串。
    6. virtual int throttle_time() const =0;
      返回throttle时间。
    7. virtual std::string broker_name() const =0;
      返回Broker名称。
    8. virtual int broker_id() const =0;
      返回Broker ID。

    2.5、RdKafka::EventCb

    事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。

    virtual void event_cb(Event &event)=0; //  事件回调函数
    
    • 1

    C++封装示例:

    class ProducerEventCb : public RdKafka::EventCb
    {
    public:
        void event_cb(RdKafka::Event &event)
        {
            switch(event.type())
            {
            case RdKafka::Event::EVENT_ERROR:
                std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::Event::EVENT_STATS:
                std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
                break;
            case RdKafka::Event::EVENT_LOG:
                std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
                break;
            case RdKafka::Event::EVENT_THROTTLE:
                std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
                break;
            }
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.6、RdKafka::PartitionerCb

    PartitionerCb用实现自定义分区策略,需要使用RdKafka::Conf::set()设置partitioner_cb属性。

    virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt,void *msg_opaque)=0;
    //Partitioner回调函数
    
    • 1
    • 2

    返回topic主题中使用key的分区,key可以是NULL或字符串。
    返回值必须在0到partition_cnt间,如果分区失败可能返回RD_KAFKA_PARTITION_UA (-1)。
    msg_opaque与RdKafka::Producer::produce()调用提供的msg_opaque相同。

    C++封装示例:

    class HashPartitionerCb : public RdKafka::PartitionerCb
    {
    public:
        int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
                                int32_t partition_cnt, void *msg_opaque)
        {
            char msg[128] = {0};
            int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
            //                          [topic][key][partition_cnt][partition_id] 
            //                          :[test][6419][2][1]
            sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),       
                    key->c_str(), partition_cnt, partition_id);
            std::cout << msg << std::endl;
            return partition_id;
        }
    private:
    
        static inline unsigned int generate_hash(const char *str, size_t len)
        {
            unsigned int hash = 5381;
            for (size_t i = 0 ; i < len ; i++)
                hash = ((hash << 5) + hash) + str[i];
            return hash;
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    2.7、RdKafka::Topic

    1. static Topic * create(Handle *base, const std::string &topic_str, Conf *conf, std::string &errstr);
      使用conf配置创建名为topic_str的Topic句柄。
    2. const std::string name ();
      获取Topic名称。
    3. bool partition_available(int32_t partition) const;
      获取parition分区是否可用,只能在 RdKafka::PartitionerCb回调函数内被调用。
    4. ErrorCode offset_store(int32_t partition, int64_t offset);
      存储Topic的partition分区的offset位移,只能用于RdKafka::Consumer,不能用于RdKafka::KafkaConsumer高级接口类。使用本接口时,auto.commit.enable参数必须设置为false。
    5. virtual struct rd_kafka_topic_s *c_ptr () = 0;
      返回底层数据结构的rd_kafka_topic_t句柄,不推荐利用rd_kafka_topic_t句柄调用C API,但如果C++ API没有提供相应功能,可以直接使用C API和librdkafka核心交互。
    static const int32_t PARTITION_UA = -1;		//未赋值分区
    static const int64_t OFFSET_BEGINNING = -2;	//特殊位移,从开始消费
    static const int64_t OFFSET_END = -1;		//特殊位移,从末尾消费
    static const int64_t OFFSET_STORED = -1000;	//使用offset存储
    
    • 1
    • 2
    • 3
    • 4

    2.8、RdKafka::Producer(核心)

    1. static Producer * create(Conf *conf, std::string &errstr);
      创建一个新的Producer客户端对象,conf用于替换默认配置对象,本函数调用后conf可以重用。成功返回新的Producer客户端对象,失败返回NULL,errstr可读错误信息。
    2. ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len,const std::string *key, void *msg_opaque);
      生产和发送单条消息到Broker。msgflags:可选项为RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。
    参数含义
    topic主题
    partition分区
    msgflags可选项为RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。RK_MSG_FREE表 示RdKafka调用produce完成后会释放payload数据;RK_MSG_COPY表示payload数据会被拷贝,在produce调用完成后RdKafka不会使用payload指针;RK_MSG_BLOCK表示在消息队列满时阻塞produce函数,如果dr_cb回调函数被使用,应用程序必须调用rd_kafka_poll函数确保投递消息队列的投递消息投递完。当消息队列满时,失败会导致produce函数的永久阻塞。RK_MSG_FREE和RK_MSG_COPY是互斥操作。如果produce函数调用时指定了RK_MSG_FREE,并返回了错误码,与payload指针相关的内存数据必须由使用者负责释放。
    payload长度为len的消息负载数据
    lenpayload消息数据的长度。
    keykey是可选的消息key,如果非NULL,会被传递给主题partitioner,并被随消息发送到Broker和传递给Consumer。
    msg_opaquemsg_opaque是可选的应用程序提供给每条消息的opaque指针,opaque指针会在dr_cb回调函数内提供。

    返回错误码:

    错误码含义
    ERR_NO_ERROR消息成功发送并入对列。
    ERR_QUEUE_FULL最大消息数量达到queue.buffering.max.message。
    ERR_MSG_SIZE_TOO_LARGE消息数据大小太大,超过messages.max.bytes配置的值。
    ERR_UNKNOWN_PARTITION请求一个Kafka集群内的未知分区。
    ERR_UNKNOWN_TOPICtopic是Kafka集群的未知主题。
    1. ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len,const void *key, size_t key_len, void *msg_opaque);
      生产和发送单条消息到Broker,传递key数据指针和key长度。
    2. ErrorCode produce(Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque);
      生产和发送单条消息到Broker,传递消息数组和key数组。接受数组类型的key和payload,数组会被复制。
    3. ErrorCode flush (int timeout_ms);
      等待所有未完成的所有Produce请求完成。为了确保所有队列和已经执行的Produce请求在中止前完成,flush操作优先于销毁生产者实例完成。本函数会调用Producer::poll()函数,因此会触发回调函数。
    4. ErrorCode purge (int purge_flags);
      清理生产者当前处理的消息。本函数调用时可能会阻塞一定时间,当后台线程队列在清理时。应用程序需要在调用poll或flush函数后,执行清理消息的dr_cb回调函数。
    5. virtual Error *init_transactions (int timeout_ms) = 0;
      初始化Producer实例的事务。失败返回RdKafka::Error错误对象,成功返回NULL。
      通过调用RdKafka::Error::is_retriable()函数可以检查返回的错误对象是否有权限重试,调用
      RdKafka::Error::is_fatal()检查返回的错误对象是否是严重错误。返回的错误对象必须elete。
    6. virtual Error *begin_transaction () = 0;
      启动事务。本函数调用前,init_transactions()函数必须被成功调用。
      成功返回NULL,失败返回错误对象。通过调用RdKafka::Error::is_fatal_error()函数可以检查是否是严重错误,返回的错误对象必须delete。
    7. virtual Error send_offsets_to_transaction (const std::vector &offsets,const ConsumerGroupMetadata *group_metadata,int timeout_ms) = 0;
      发送TopicPartition位移链表到由group_metadata指定的Consumer Group协调器,如果事务提交成功,位移才会被提交。
    8. virtual Error *commit_transaction (int timeout_ms) = 0;
      提交当前事务。在实际提交事务时,任何未完成的消息会被完成投递。
      成功返回NULL,失败返回错误对象。通过调用错误对象的方法可以检查是否有权限重试,是否是严重错误、可中止错误等。
    9. virtual Error *abort_transaction (int timeout_ms) = 0;
      停止事务。本函数从非严重错误、可终止事务中用于恢复。未完成消息会被清理。

    三、Kafka 生产者客户端开发

    3.1、必要的参数配置(bootstrap.servers)

    (1)指定连接 Kafka 集群所需要的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或者多个地址,中间以逗号进行隔开,此参数的默认值为 “”。
    (2)注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找其他 broker 的信息。
    (3)过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。

    // 创建Kafka Conf对象
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if(m_config == NULL)
    {
        std::cout << "Create RdKafka Conf failed." << std::endl;
    }
    // 创建Topic Conf对象
    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if(m_topicConfig == NULL)
    {
        std::cout << "Create RdKafka Topic Conf failed." << std::endl;
    }
    // 设置Broker属性
    RdKafka::Conf::ConfResult errCode;
    m_dr_cb = new ProducerDeliveryReportCb;
    std::string errorStr;
    errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    m_event_cb = new ProducerEventCb;
    errCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    
    m_partitioner_cb = new HashPartitionerCb;
    errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    
    errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    
    errCode = m_config->set("message.max.bytes", "10240000", errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed:" << errorStr << std::endl;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    3.2、创建生产者实例

    生产者的相关配置和实例的创建可以在类的构造函数实现。比如Kafka Conf对象、Topic Conf对象、设置Broker属性、Producer、Topic对象等。

    // 创建Producer
    m_producer = RdKafka::Producer::create(m_config, errorStr);
    if(m_producer == NULL)
    {
        std::cout << "Create Producer failed:" << errorStr << std::endl;
    }
    // 创建Topic对象
    m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
    if(m_topic == NULL)
    {
        std::cout << "Create Topic failed:" << errorStr << std::endl;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.3、消息发送

    librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。
    同一个生产者可以发送多个主题的,在内部处理时根据传入的topic对象发送给对应的主题分区。

     RdKafka::ErrorCode errorCode = m_producer->produce(
     									m_topic, 
     									RdKafka::Topic::PARTITION_UA,
                                       	RdKafka::Producer::RK_MSG_COPY,
                                       	payload, 
                                       	len, 
                                       	&key, 
                                       	NULL);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.4、完整示例代码

    KafkaProducer.h

    #ifndef KAFKAPRODUCER_H
    #define KAFKAPRODUCER_H
    
    #pragma once
    #include 
    #include 
    #include "rdkafkacpp.h"
    
    class KafkaProducer
    {
    public:
    	/**
    	* @brief KafkaProducer
    	* @param brokers
    	* @param topic
    	* @param partition
    	*/
    	explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition);
    	/**
    	* @brief push Message to Kafka
    	* @param str, message data
    	*/
    	void pushMessage(const std::string& str, const std::string& key);
    	~KafkaProducer();
    
    private:
    	std::string m_brokers;			// Broker列表,多个使用逗号分隔
    	std::string m_topicStr;			// Topic名称
    	int m_partition;				// 分区
    
    	RdKafka::Conf* m_config;        // Kafka Conf对象
    	RdKafka::Conf* m_topicConfig;   // Topic Conf对象
    	RdKafka::Topic* m_topic;		// Topic对象
    	RdKafka::Producer* m_producer;	// Producer对象
    
    	/*只要看到Cb 结尾的类,要继承它然后实现对应的回调函数*/
    	RdKafka::DeliveryReportCb* m_dr_cb;
    	RdKafka::EventCb* m_event_cb;
    	RdKafka::PartitionerCb* m_partitioner_cb;
    };
    
    #endif
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    KafkaProducer.cpp

    #include "KafkaProducer.h"
    
    // call back
    class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
    {
    public:
    	void dr_cb(RdKafka::Message &message)
    	{
    		if(message.err())
    			std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
    		else
    		{
    			// Message delivered to topic test [0] at offset 135000
    			std::cerr << "Message delivered to topic " << message.topic_name()
    				<< " [" << message.partition() << "] at offset "
    				<< message.offset() << std::endl;
    		}
    	}
    };
    
    class ProducerEventCb : public RdKafka::EventCb
    {
    public:
    	void event_cb(RdKafka::Event &event)
    	{
    		switch (event.type())
    		{
    		case RdKafka::Event::EVENT_ERROR:
    			std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
    			break;
    		case RdKafka::Event::EVENT_STATS:
    			std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
    			break;
    		case RdKafka::Event::EVENT_LOG:
    			std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
    			break;
    		case RdKafka::Event::EVENT_THROTTLE:
    			std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
    			break;
    		}
    	}
    };
    
    class HashPartitionerCb : public RdKafka::PartitionerCb
    {
    public:
    	int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
    		int32_t partition_cnt, void *msg_opaque)
    	{
    		char msg[128] = { 0 };
    		int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
    		//                          [topic][key][partition_cnt][partition_id] 
    		//                          :[test][6419][2][1]
    		sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),
    			key->c_str(), partition_cnt, partition_id);
    		std::cout << msg << std::endl;
    		return partition_id;
    	}
    private:
    
    	static inline unsigned int generate_hash(const char *str, size_t len)
    	{
    		unsigned int hash = 5381;
    		for (size_t i = 0; i < len; i++)
    			hash = ((hash << 5) + hash) + str[i];
    		return hash;
    	}
    };
    
    
    KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
    {
    	m_brokers = brokers;
    	m_topicStr = topic;
    	m_partition = partition;
    
    	/* 创建Kafka Conf对象 */
    	m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    	if(m_config==NULL)
    		std::cout << "Create RdKafka Conf failed." << std::endl;
    
    	/* 创建Topic Conf对象 */
    	m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    	if (m_topicConfig == NULL)
    		std::cout << "Create RdKafka Topic Conf failed." << std::endl;
    
    	/* 设置Broker属性 */
    	RdKafka::Conf::ConfResult errCode;
    	std::string errorStr;
    	m_dr_cb = new ProducerDeliveryReportCb;
    	// 设置dr_cb属性值
    	errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
    	if (errCode != RdKafka::Conf::CONF_OK)
    	{
    		std::cout << "Conf set failed:" << errorStr << std::endl;
    	}
    	// 设置event_cb属性值
    	m_event_cb = new ProducerEventCb;
    	errCode = m_config->set("event_cb", m_event_cb, errorStr);
    	if (errCode != RdKafka::Conf::CONF_OK)
    	{
    		std::cout << "Conf set failed:" << errorStr << std::endl;
    	}
    	// 自定义分区策略
    	m_partitioner_cb = new HashPartitionerCb;
    	errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
    	if (errCode != RdKafka::Conf::CONF_OK)
    	{
    		std::cout << "Conf set failed:" << errorStr << std::endl;
    	}
    	// 设置配置对象的属性值
    	errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
    	if (errCode != RdKafka::Conf::CONF_OK)
    	{
    		std::cout << "Conf set failed:" << errorStr << std::endl;
    	}
    	errCode = m_config->set("message.max.bytes", "10240000", errorStr);
    	if (errCode != RdKafka::Conf::CONF_OK)
    	{
    		std::cout << "Conf set failed:" << errorStr << std::endl;
    	}
    	errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    	if (errCode != RdKafka::Conf::CONF_OK)
    	{
    		std::cout << "Conf set failed:" << errorStr << std::endl;
    	}
    
    	/* 创建Producer */
    	m_producer = RdKafka::Producer::create(m_config, errorStr);
    	if (m_producer == NULL)
    	{
    		std::cout << "Create Producer failed:" << errorStr << std::endl;
    	}
    
    	/* 创建Topic对象 */
    	m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
    	if (m_topic == NULL)
    	{
    		std::cout << "Create Topic failed:" << errorStr << std::endl;
    	}
    }
    
    KafkaProducer::~KafkaProducer()
    {
    	while (m_producer->outq_len() > 0)
    	{
    		std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
    		m_producer->flush(5000);
    	}
    	delete m_config;
    	delete m_topicConfig;
    	delete m_topic;
    	delete m_producer;
    	delete m_dr_cb;
    	delete m_event_cb;
    	delete m_partitioner_cb;
    }
    
    void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
    {
    	int32_t len = str.length();
    	void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
    	RdKafka::ErrorCode errorCode = m_producer->produce(
    		m_topic,
    		RdKafka::Topic::PARTITION_UA,
    		RdKafka::Producer::RK_MSG_COPY,
    		payload,
    		len,
    		&key,
    		NULL);
    	m_producer->poll(0);
    	if (errorCode != RdKafka::ERR_NO_ERROR)
    	{
    		std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
    		if (errorCode == RdKafka::ERR__QUEUE_FULL)
    		{
    			m_producer->poll(100);
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180

    CMakeLists.txt

    cmake_minimum_required(VERSION 2.8)
    
    project(KafkaProducer)
    
    set(CMAKE_CXX_STANDARD 11)
    set(CMAKE_CXX_COMPILER "g++")
    set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
    set(CMAKE_INCLUDE_CURRENT_DIR ON)
    
    # Kafka头文件路径
    include_directories(/usr/local/include/librdkafka)
    # Kafka库路径
    link_directories(/usr/local/lib)
    
    aux_source_directory(. SOURCE)
    
    add_executable(${PROJECT_NAME} ${SOURCE})
    TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    测试文件main.cpp

    #include 
    #include "KafkaProducer.h"
    using namespace std;
    
    int main()
    {
        // 创建Producer
        // KafkaProducer producer("127.0.0.1:9092,192.168.2.111:9092", "test", 0);
        KafkaProducer producer("127.0.0.1:9092", "test", 0);
        for(int i = 0; i < 10000; i++)
        {
            char msg[64] = {0};
            sprintf(msg, "%s%4d", "Hello RdKafka ", i);
            // 生产消息
            char key[8] = {0};      // 主要用来做负载均衡
            sprintf(key, "%d", i);
            producer.pushMessage(msg, key);  
        }
        RdKafka::wait_destroyed(5000);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    编译:

    mkdir build
    cd build
    cmake ..
    make
    
    • 1
    • 2
    • 3
    • 4

    总结

    Kafka Producer使用流程:

    1. 创建Kafka配置实例。
    2. 创建Topic配置实例。
    3. 设置Kafka配置实例Broker属性。
    4. 设置Topic配置实例属性。
    5. 注册回调函数(分区策略回调函数需要注册到Topic配置实例)。
    6. 创建Kafka Producer客户端实例。
    7. 创建Topic实例。
    8. 生产消息。
    9. 阻塞等待Producer生产消息完成。
    10. 等待Produce请求完成。
    11. 销毁Kafka Producer客户端实例。
      在这里插入图片描述
  • 相关阅读:
    Web模块
    使用nodel实现前后端数据渲染
    Linux中比cp好用10倍的rsync,你会用了吗
    php设计模式之单例模式详解
    CISC和RISC的比较
    笔记记录--基于ccpd数据集利用Paddle OCR训练车牌检测模型
    负载均衡技术全景:理论、实践与案例研究
    Seata介绍
    安卓窗体显示状态
    P4 开发实践 — NG-SDN Tutorial — Exercise 6: Segment Routing v6 (SRv6)
  • 原文地址:https://blog.csdn.net/Long_xu/article/details/128092164