• 安装librdkafka和使用C语言操作kafka的范例


    一、安装librdkafka

    git clone https://github.com/edenhill/librdkafka.git
    cd librdkafka
    git checkout v1.7.0
    ./configure
    make
    sudo make install
    sudo ldconfig
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在librdkafka的examples目录下会有示例程序。比如consumer的启动需要下列参数

    % Usage: ./consumer <broker> <group.id> <topic1> <topic2>..
    
    • 1

    指定broker、group id、topic(可以订阅多个)。示例:

    ./consumer localhost:9092 0 test
    
    • 1

    缩略语介绍:

    缩略语缩略语全称示例或说明
    rdRapid Developmentrd.h
    rkRdKafka
    topparTopic Partitionstruct rd_kafka_toppar_t { };
    repReplystruct rd_kafka_t { rd_kafka_q_t *rk_rep };
    msgqMessage Queuestruct rd_kafka_msgq_t { };
    rkbRdKafka BrokerKafka代理
    rkoRdKafka OperationKafka操作
    rkmRdKafka MessageKafka消息
    payload存在Kafka上的消息(或叫Log)

    二、开启kafka相关服务

    2.1、启动zookeeper

    启动zookeeper可以通过下面的脚本来启动zookeeper服务,当然,也可以自己独立搭建zookeeper的集群来实现。这里我们直接使用kafka自带的zookeeper。

    cd bin/
    # 前台运行:
    sh zookeeper-server-start.sh  ../config/zookeeper.properties
    
    # 后台运行:
    sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以通过命令lsof -i:2181 查看zookeeper是否启动成功。

    $ lsof -i:2181
    COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
    java    74930  fly   96u  IPv6 734467      0t0  TCP *:2181 (LISTEN)
    
    • 1
    • 2
    • 3

    2.2、启动Kafka

    启动kafka(kafka安装路径的bin目录下执行),默认启动端口9092。

    sh kafka-server-start.sh -daemon ../config/server.properties
    
    • 1

    2.3、创建topic

    sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    
    • 1
    • 2

    参数说明:

    –create 是创建主题的的动作指令。
    –zookeeper 指定kafka所连接的zookeeper服务地址。
    –replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
    –partitions 指定分区个数;多通道,类似车道。
    –topic 指定所要创建主题的名称,比如test。
    成功则显示:

    Created topic "test".
    
    • 1

    三、c语言操作kafka的范例

    3.1、消费者

    在librdkafka\examples下有consumer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。

    /**
     * Simple high-level balanced Apache Kafka consumer
     * using the Kafka driver from librdkafka
     * (https://github.com/edenhill/librdkafka)
     */
    
    #include 
    #include 
    #include 
    #include 
    
    
    /* Typical include path would be , but this program
     * is builtin from within the librdkafka source tree and thus differs. */
    //#include 
    #include "rdkafka.h"
    
    
    static volatile sig_atomic_t run = 1;
    
    /**
     * @brief Signal termination of program
     */
    static void stop (int sig) {
            run = 0;
    }
    
    
    
    /**
     * @returns 1 if all bytes are printable, else 0.
     */
    static int is_printable (const char *buf, size_t size) {
            size_t i;
    
            for (i = 0 ; i < size ; i++)
                    if (!isprint((int)buf[i]))
                            return 0;
    
            return 1;
    }
    
    
    int main (int argc, char **argv) {
            rd_kafka_t *rk;          /* Consumer instance handle */
            rd_kafka_conf_t *conf;   /* Temporary configuration object */
            rd_kafka_resp_err_t err; /* librdkafka API error code */
            char errstr[512];        /* librdkafka API error reporting buffer */
            const char *brokers;     /* Argument: broker list */
            const char *groupid;     /* Argument: Consumer group id */
            char **topics;           /* Argument: list of topics to subscribe to */
            int topic_cnt;           /* Number of topics to subscribe to */
            rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
            int i;
    
            /*
             * Argument validation
             */
            if (argc < 4) {
                    fprintf(stderr,
                            "%% Usage: "
                            "%s    ..\n",
                            argv[0]);
                    return 1;
            }
    
            brokers   = argv[1];
            groupid   = argv[2];
            topics    = &argv[3];
            topic_cnt = argc - 3;
    
    
            /*
             * Create Kafka client configuration place-holder
             */
            conf = rd_kafka_conf_new();	// 创建配置文件
    
            /* Set bootstrap broker(s) as a comma-separated list of
             * host or host:port (default port 9092).
             * librdkafka will use the bootstrap brokers to acquire the full
             * set of brokers from the cluster. */
            if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
                                  errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                    fprintf(stderr, "%s\n", errstr);
                    rd_kafka_conf_destroy(conf);
                    return 1;
            }
    
            /* Set the consumer group id.
             * All consumers sharing the same group id will join the same
             * group, and the subscribed topic' partitions will be assigned
             * according to the partition.assignment.strategy
             * (consumer config property) to the consumers in the group. */
            if (rd_kafka_conf_set(conf, "group.id", groupid,
                                  errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                    fprintf(stderr, "%s\n", errstr);
                    rd_kafka_conf_destroy(conf);
                    return 1;
            }
    
            /* If there is no previously committed offset for a partition
             * the auto.offset.reset strategy will be used to decide where
             * in the partition to start fetching messages.
             * By setting this to earliest the consumer will read all messages
             * in the partition if there was no previously committed offset. */
            if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
                                  errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                    fprintf(stderr, "%s\n", errstr);
                    rd_kafka_conf_destroy(conf);
                    return 1;
            }
    
            /*
             * Create consumer instance.
             *
             * NOTE: rd_kafka_new() takes ownership of the conf object
             *       and the application must not reference it again after
             *       this call.
             */
             // 创建一个kafka消费者
            rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
            if (!rk) {
                    fprintf(stderr,
                            "%% Failed to create new consumer: %s\n", errstr);
                    return 1;
            }
    
            conf = NULL; /* Configuration object is now owned, and freed,
                          * by the rd_kafka_t instance. */
    
    
            /* Redirect all messages from per-partition queues to
             * the main queue so that messages can be consumed with one
             * call from all assigned partitions.
             *
             * The alternative is to poll the main queue (for events)
             * and each partition queue separately, which requires setting
             * up a rebalance callback and keeping track of the assignment:
             * but that is more complex and typically not recommended. */
            rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中
    
    
            /* Convert the list of topics to a format suitable for librdkafka */
            // 创建主题分区列表
            subscription = rd_kafka_topic_partition_list_new(topic_cnt);
            for (i = 0 ; i < topic_cnt ; i++)
                    rd_kafka_topic_partition_list_add(subscription,
                                                      topics[i],
                                                      /* the partition is ignored
                                                       * by subscribe() */
                                                      RD_KAFKA_PARTITION_UA);
    
            /* Subscribe to the list of topics */
            err = rd_kafka_subscribe(rk, subscription);
            if (err) {
                    fprintf(stderr,
                            "%% Failed to subscribe to %d topics: %s\n",
                            subscription->cnt, rd_kafka_err2str(err));
                    rd_kafka_topic_partition_list_destroy(subscription);
                    rd_kafka_destroy(rk);
                    return 1;
            }
    
            fprintf(stderr,
                    "%% Subscribed to %d topic(s), "
                    "waiting for rebalance and messages...\n",
                    subscription->cnt);
    
            rd_kafka_topic_partition_list_destroy(subscription);
    
    
            /* Signal handler for clean shutdown */
            signal(SIGINT, stop);
    
            /* Subscribing to topics will trigger a group rebalance
             * which may take some time to finish, but there is no need
             * for the application to handle this idle period in a special way
             * since a rebalance may happen at any time.
             * Start polling for messages. */
    
            while (run) {
                    rd_kafka_message_t *rkm;
    				
                    rkm = rd_kafka_consumer_poll(rk, 100);
                    if (!rkm)
                            continue; /* Timeout: no message within 100ms,
                                       *  try again. This short timeout allows
                                       *  checking for `run` at frequent intervals.
                                       */
    
                    /* consumer_poll() will return either a proper message
                     * or a consumer error (rkm->err is set). */
                    if (rkm->err) {
                            /* Consumer errors are generally to be considered
                             * informational as the consumer will automatically
                             * try to recover from all types of errors. */
                            fprintf(stderr,
                                    "%% Consumer error: %s\n",
                                    rd_kafka_message_errstr(rkm));
                            rd_kafka_message_destroy(rkm);
                            continue;
                    }
    
                    /* Proper message. */
                    printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",
                           rd_kafka_topic_name(rkm->rkt), rkm->partition,
                           rkm->offset);
    
                    /* Print the message key. */
                    if (rkm->key && is_printable(rkm->key, rkm->key_len))
                            printf(" Key: %.*s\n",
                                   (int)rkm->key_len, (const char *)rkm->key);
                    else if (rkm->key)
                            printf(" Key: (%d bytes)\n", (int)rkm->key_len);
    
                    /* Print the message value/payload. */
                    if (rkm->payload && is_printable(rkm->payload, rkm->len))
                            printf(" Value: %.*s\n",
                                   (int)rkm->len, (const char *)rkm->payload);
                    else if (rkm->payload)
                            printf(" Value: (%d bytes)\n", (int)rkm->len);
    
                    rd_kafka_message_destroy(rkm);
            }
    
    
            /* Close the consumer: commit final offsets and leave the group. */
            fprintf(stderr, "%% Closing consumer\n");
            rd_kafka_consumer_close(rk);
    
    
            /* Destroy the consumer */
            rd_kafka_destroy(rk);
    
            return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    函数调用含义
    rd_kafka_conf_new();创建配置文件
    rd_kafka_conf_set(…)设置参数。可以设置broker、group id、auto.offset.reset等
    rd_kafka_new(…)创建一个kafka消费者
    rd_kafka_poll_set_consumer(…)设置到poll里面
    rd_kafka_topic_partition_list_new(…)创建主题分区列表
    rd_kafka_topic_partition_list_add(…)将主题添加到列表中,有订阅多个就添加多个
    rd_kafka_subscribe(…)订阅主题
    rd_kafka_consumer_poll(…)轮询数据,可以设置超时

    3.2、生产者

    在librdkafka\examples下有producer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。

    /**
     * Simple Apache Kafka producer
     * using the Kafka driver from librdkafka
     * (https://github.com/edenhill/librdkafka)
     */
    
    #include 
    #include 
    #include 
    
    
    /* Typical include path would be , but this program
     * is builtin from within the librdkafka source tree and thus differs. */
    #include "rdkafka.h"
    
    
    static volatile sig_atomic_t run = 1;
    
    /**
     * @brief Signal termination of program
     */
    static void stop (int sig) {
            run = 0;
            fclose(stdin); /* abort fgets() */
    }
    
    
    /**
     * @brief Message delivery report callback.
     *
     * This callback is called exactly once per message, indicating if
     * the message was succesfully delivered
     * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
     * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
     *
     * The callback is triggered from rd_kafka_poll() and executes on
     * the application's thread.
     */
    static void dr_msg_cb (rd_kafka_t *rk,
                           const rd_kafka_message_t *rkmessage, void *opaque) {
            if (rkmessage->err)
                    fprintf(stderr, "%% Message delivery failed: %s\n",
                            rd_kafka_err2str(rkmessage->err));
            else
                    fprintf(stderr,
                            "%% Message delivered (%zd bytes, "
                            "partition %"PRId32")\n",
                            rkmessage->len, rkmessage->partition);
    
            /* The rkmessage is destroyed automatically by librdkafka */
    }
    
    
    
    int main (int argc, char **argv) {
            rd_kafka_t *rk;         /* Producer instance handle */
            rd_kafka_conf_t *conf;  /* Temporary configuration object */
            char errstr[512];       /* librdkafka API error reporting buffer */
            char buf[512];          /* Message value temporary buffer */
            const char *brokers;    /* Argument: broker list */
            const char *topic;      /* Argument: topic to produce to */
    
            /*
             * Argument validation
             */
            if (argc != 3) {
                    fprintf(stderr, "%% Usage: %s  \n", argv[0]);
                    return 1;
            }
    
            brokers = argv[1];
            topic   = argv[2];
    
    
            /*
             * Create Kafka client configuration place-holder
             */
            conf = rd_kafka_conf_new();
    
            /* Set bootstrap broker(s) as a comma-separated list of
             * host or host:port (default port 9092).
             * librdkafka will use the bootstrap brokers to acquire the full
             * set of brokers from the cluster. */
            if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
                                  errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                    fprintf(stderr, "%s\n", errstr);
                    return 1;
            }
    
            /* Set the delivery report callback.
             * This callback will be called once per message to inform
             * the application if delivery succeeded or failed.
             * See dr_msg_cb() above.
             * The callback is only triggered from rd_kafka_poll() and
             * rd_kafka_flush(). */
            rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    
            /*
             * Create producer instance.
             *
             * NOTE: rd_kafka_new() takes ownership of the conf object
             *       and the application must not reference it again after
             *       this call.
             */
            rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
            if (!rk) {
                    fprintf(stderr,
                            "%% Failed to create new producer: %s\n", errstr);
                    return 1;
            }
    
            /* Signal handler for clean shutdown */
            signal(SIGINT, stop);
    
            fprintf(stderr,
                    "%% Type some text and hit enter to produce message\n"
                    "%% Or just hit enter to only serve delivery reports\n"
                    "%% Press Ctrl-C or Ctrl-D to exit\n");
    
            while (run && fgets(buf, sizeof(buf), stdin)) {
                    size_t len = strlen(buf);
                    rd_kafka_resp_err_t err;
    
                    if (buf[len-1] == '\n') /* Remove newline */
                            buf[--len] = '\0';
    
                    if (len == 0) {
                            /* Empty line: only serve delivery reports */
                            rd_kafka_poll(rk, 0/*non-blocking */);
                            continue;
                    }
    
                    /*
                     * Send/Produce message.
                     * This is an asynchronous call, on success it will only
                     * enqueue the message on the internal producer queue.
                     * The actual delivery attempts to the broker are handled
                     * by background threads.
                     * The previously registered delivery report callback
                     * (dr_msg_cb) is used to signal back to the application
                     * when the message has been delivered (or failed).
                     */
            retry:
                    err = rd_kafka_producev(
                            /* Producer handle */
                            rk,
                            /* Topic name */
                            RD_KAFKA_V_TOPIC(topic),
                            /* Make a copy of the payload. */
                            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                            /* Message value and length */
                            RD_KAFKA_V_VALUE(buf, len),
                            /* Per-Message opaque, provided in
                             * delivery report callback as
                             * msg_opaque. */
                            RD_KAFKA_V_OPAQUE(NULL),
                            /* End sentinel */
                            RD_KAFKA_V_END);
    
                    if (err) {
                            /*
                             * Failed to *enqueue* message for producing.
                             */
                            fprintf(stderr,
                                    "%% Failed to produce to topic %s: %s\n",
                                    topic, rd_kafka_err2str(err));
    
                            if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                                    /* If the internal queue is full, wait for
                                     * messages to be delivered and then retry.
                                     * The internal queue represents both
                                     * messages to be sent and messages that have
                                     * been sent or failed, awaiting their
                                     * delivery report callback to be called.
                                     *
                                     * The internal queue is limited by the
                                     * configuration property
                                     * queue.buffering.max.messages */
                                    rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
                                    goto retry;
                            }
                    } else {
                            fprintf(stderr, "%% Enqueued message (%zd bytes) "
                                    "for topic %s\n",
                                    len, topic);
                    }
    
    
                    /* A producer application should continually serve
                     * the delivery report queue by calling rd_kafka_poll()
                     * at frequent intervals.
                     * Either put the poll call in your main loop, or in a
                     * dedicated thread, or call it after every
                     * rd_kafka_produce() call.
                     * Just make sure that rd_kafka_poll() is still called
                     * during periods where you are not producing any messages
                     * to make sure previously produced messages have their
                     * delivery report callback served (and any other callbacks
                     * you register). */
                    rd_kafka_poll(rk, 0/*non-blocking*/);
            }
    
    
            /* Wait for final messages to be delivered or fail.
             * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
             * waits for all messages to be delivered. */
            fprintf(stderr, "%% Flushing final messages..\n");
            rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
    
            /* If the output queue is still not empty there is an issue
             * with producing messages to the clusters. */
            if (rd_kafka_outq_len(rk) > 0)
                    fprintf(stderr, "%% %d message(s) were not delivered\n",
                            rd_kafka_outq_len(rk));
    
            /* Destroy the producer instance */
            rd_kafka_destroy(rk);
    
            return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221

    其操作流程和消费者是类似的。

    函数调用含义
    rd_kafka_conf_new();创建配置文件
    rd_kafka_conf_set(…)设置参数。设置bootstrap.servers
    rd_kafka_conf_set_dr_msg_cb(…)设置交付报告回调
    rd_kafka_new(…)创建一个kafka生产者,RD_KAFKA_PRODUCER
    rd_kafka_producev(…)发送数据

    3.3、生产者和消费者的交互

    (1)启动消费者。

    ./consumer localhost:9092 0 test
    
    • 1

    显示:

    % Subscribed to 1 topic(s), waiting for rebalance and messages...
    
    • 1

    (2)启动生产者。

    ./producer localhost:9092 test
    
    • 1

    显示

    % Type some text and hit enter to produce message
    % Or just hit enter to only serve delivery reports
    % Press Ctrl-C or Ctrl-D to exit
    
    
    • 1
    • 2
    • 3
    • 4

    (3)通信过程。
    生产者发送hello:

    $ ./producer localhost:9092 test
    % Type some text and hit enter to produce message
    % Or just hit enter to only serve delivery reports
    % Press Ctrl-C or Ctrl-D to exit
    hello consumer
    % Enqueued message (14 bytes) for topic test
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    消费者接受:

    $ ./consumer localhost:9092 0 test
    % Subscribed to 1 topic(s), waiting for rebalance and messages...
    Message on test [0] at offset 4:
     Value: hello consumer
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    总结

    1. 一个分区只能被一个消费者读取。如果一个topic只有一个分区,多个消费者读取时只有一个消费者能读到数据;单个分区开启多个消费者去读取数据是没有意义的。
    2. 熟悉消费者概念和C/C++编写生产消费。

    在这里插入图片描述

  • 相关阅读:
    李彦宏:我们即将进入一个AI原生的时代|百度世界2023
    windows平台FairMOT的实现
    煤矿生产高精专!选矿厂 3D 可视化监管,实现提质增效
    面试常问:HTTPS的加密过程 ----- 光明和黑暗的恩怨情仇
    22-08-08 西安 尚医通(04)MongoDB命令、MongoTemplate、MongoRepository
    【Git技巧】第七篇 git分区原理(超级详细)
    顺序查找和折半查找
    XGBoost论文翻译
    Ansys Zemax|在设计抬头显示器(HUD)时需要使用哪些工具?
    【算法|双指针系列No.5】leetcode611. 有效三角形的个数
  • 原文地址:https://blog.csdn.net/Long_xu/article/details/128073646