• Kafka 消息的生产和消费


    原文链接:
    Kafka Tutorial: Creating a Kafka Producer in Java (cloudurable.com)
    Kafka Tutorial: Creating a Kafka Consumer in Java (cloudurable.com)

    承接着上文我们配置好了Kafka之后,我们现在用java 来实现向Kafka里发送message 和消费message。

    首先Kafka的依赖我们需要加一下:
    添加kafka-clients依赖就可以了。其他的jar,会自动download 下来。

                org.apache.kafka

                <artifactId>kafka-clients

                0.11.0.2

            


    第一步就是要创建一个Producer.创建一个Producer,我们需要知道我们要连的kafka 的bootstrap server和要往哪个topic里面发送消息。这里假设我们的服务器有3台:"localhost:9092,localhost:9093,localhost:9094"

    Topic我们来mock一个:"my-example-topic"

    下面是我们创建KafkaProducer的代码。

    private static Producer createProducer() {

            Properties props = new Properties();

            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

            props.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducer");

            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

            return new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        }


    这里用到了序列化,这里Kafka 的key 和value 都需要序列化,这样才能持久化到硬盘。这里我们使用的序列化类是:StringSerializer和LongSerializer.

    第二步就是我们开始往Topic里发送消息。
    使用如下的代码:

    static void runProducer(final int sendMessageCount) throws Exception {

            final Producer producer = createProducer();

            long time = System.currentTimeMillis();

            try {

                for (long index =  time; index < time + sendMessageCount; index++) {

                    final ProducerRecord record =  new ProducerRecord<>(TOPIC, index, "Hello World " +index);

                    RecordMetadata metadata = producer.send(record).get();

                    long elapsedTime =  System.currentTimeMillis() -  time;

                    System.out.printf("sent record(key=%s value=%s " + "meta(partition=%d, offset=%d) time=%d \n",

                    record.key(),record.value(),metadata.partition(), metadata.offset(),elapsedTime);

                    LOGGER.info("sent record(key=" +  record.key() + ", value=%s " + record.value() +  "), meta(partition= " + metadata.partition() + ", offset=" + metadata.offset()  + ", time= " +elapsedTime + ")");

                }

            } finally {

                producer.flush();

                producer.close();

            }

        }


    然后run一个main 方法就可以运行了。


    接下来我们来写消费端。

    第一步:同样的,我们也要有一个KafkaConsumer。定义如下:

     private static Consumer createConsumer() {

            final Properties props = new Properties();

            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                    BOOTSTRAP_SERVERS);

            props.put(ConsumerConfig.GROUP_ID_CONFIG,

                    "KafkaExampleConsumer");

            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                    LongDeserializer.class.getName());

            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                    StringDeserializer.class.getName());

            // Create the consumer using props.

            final Consumer consumer =

                    new KafkaConsumer<>(props);

            // Subscribe to the topic.

            consumer.subscribe(Collections.singletonList(TOPIC));

            return consumer;

        }


    跟生产端一样,这里我们使用了StringDeserializer和LongDeserializer来反序列化Value和Key。

    第二步:有了KafkaConsumer,我们就可以消费消息了。

    static void runConsumer() {

            final Consumer consumer = createConsumer();

            final int giveUp = 100;

            int noRecordsCount = 0;

            while (true) {

                final ConsumerRecords consumerRecords =

                        consumer.poll(1000);

                if(consumerRecords.count() == 0) {

                    noRecordsCount ++;

                    if(noRecordsCount > giveUp) break;

                    else continue;

                }

    //            System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset());

                consumerRecords.forEach(record -> {

                    LOGGER.info("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset());

                });

                consumer.commitSync();

            }

            consumer.close();

            System.out.printf("Done.");

        }

    这样我们有了生产者,消费者,也有了Kafka Server.我们就可以做一个回路测试了。



    消费者:

     

     

  • 相关阅读:
    自主专利和转让专利的区别
    磁盘误删除怎么恢复呢?教你恢复它
    SpringCloud复习:(5) feign的底层原理
    使用feign调用记录日志篇
    Linux网络编程
    蓝牙HFP协议推荐的语音丢包补偿算法浮点实现的定点化
    IDEA中安装Docker插件实现远程访问Docker
    陈可之国画|《同杯万古尘》
    PostMan的学习
    L2-021 点赞狂魔(Python3)
  • 原文地址:https://blog.csdn.net/xiaofeixia22222/article/details/126562781