• kafka入门(一):kafka消息发送与消费


    安装kafka,创建 topic:

    Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

    Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

    添加依赖包:

            
                org.springframework.kafka
                spring-kafka
                2.1.10.RELEASE
            
            
                org.apache.kafka
                kafka-clients
                2.0.0
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    kafka代码示例(一):

    主要按照以下步骤:

    • 设置 broker服务器的ip和端口, 设置 消费者群组id

    • 初始化消费者

    • 消费者订阅主题

    • 消费者批量拉取消息

    public class KafkaDemo1 {
        public static final String BROKER_LIST = "localhost:9092";
        public static final String TOPIC = "myTopic1";
        public static final String GROUP_ID = "group.demo";
    
        public static void main(String[] args) {
            consumerRecord();
        }
    
        public static void consumerRecord() {
            //属性配置
            Properties properties = getProperties(BROKER_LIST, GROUP_ID);
            //消费者初始化
            KafkaConsumer consumer = new KafkaConsumer<>(properties);
            //消息者订阅主题
            consumer.subscribe(Collections.singletonList(TOPIC));
            //循环
            while (true) {
                //每次拉取 1千条消息
                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord record : records) {
                    System.out.println("=============> 消费kafka消息:"+ record.value());
                }
            }
        }
    
        public static Properties getProperties(String brokerList, String groupId) {
            Properties properties = new Properties();
            //序列化
            properties.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            //broker服务器的ip和端口,多个用逗号隔开
            properties.put("bootstrap.servers", brokerList);
            //消费者群组id
            properties.put("group.id", groupId);
            return properties;
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
    在这里插入图片描述

    观察idea 控制台,可以看到 成功消费了消息:

    =============> 消费kafka消息:hello kafka
    
    • 1

    参考资料:

    《深入理解kafka 核心设计与实践原理》

  • 相关阅读:
    相关性网络图
    现货白银入门技巧:心理线
    通俗讲解傅里叶变换
    PyQt5 | 手把手教你YOLOv5添加PyQt页面 | 3/3
    Qt (QFileDialog&QColorDialog&QFontDialog) 对话框实战
    EasyExcel完成excel文件的导入导出
    k8s中的端口hostPort、port、nodePort、targetPort
    基金客户服务
    web大作业 静态网页(地下城与勇士 10页 带视频)
    SSM框架快速搭建(一)
  • 原文地址:https://blog.csdn.net/sinat_32502451/article/details/134484808