• 主题配置和 消息发送(一)KafkaTemplate 的使用


    一、主题

    1.1、配置主题

    • 在应用程序上下文定义一个 KafkaAdmin Bean, 它可以自动将主题添加到代理。通过这个Bean可以将
      每一个新建的主题 Topic 添加到应用程序上下文中。下面是一个简单的示例:

    也可以创建 TopicBuilder 类,使用它创建 Bean 更加简单。

    @Bean
    public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            return new KafkaAdmin(configs);
            }
    
    @Bean
    public KafkaAdmin.NewTopics topics456() {
            return new NewTopics(
            TopicBuilder.name("defaultBoth")
            .build(),
            TopicBuilder.name("defaultPart")
            .replicas(1)
            .build(),
            TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build());
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    使用 Spring Boot 时,KafkaAdminbean 会自动注册
    默认情况下,代理不可用时会记录一条消息,然后上下文会继续加载。可以以编程方式调用Admin的initialize()方法以稍后重试。
    也可将 admin 的fatalIfBrokerNotAvailable属性设置为true。然后上下文无法初始化。

    1.2、在运行时检查和创建主题

    目前有两种方法来进行操作:

    • createOrModifyTopics
    • describeTopics
      或者使用 AdminClient 来直接使用:
    @Autowired
    private KafkaAdmin admin;
    
    ...
    
        AdminClient client = AdminClient.create(admin.getConfigurationProperties());
        ...
        client.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    二、消息发送

    2.1、使用 KafkaTemplate

    2.1.1、KafkaTemplate 介绍

    KafkaTemplate 包装了生产者并提供了将数据发送到 Kafka 主题的便捷方法。

    2.1.2、配置 KafkaTemplate

    要使用模板,需要配置生产者工厂并在模板的构造函数中提供。

    • 单个生产者配置
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 多个生产者配置

      使用来自同一工厂的不同生产者配置创建模板,需要覆盖工厂的ProducerConfig属性。

    @Bean
    public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
    }
    
    @Bean
    public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
    Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    然后调用 KafkaTemplate 的方法来使用它。

    • 异步消息发布示例
    public void sendToKafka(final MyOutputData data) {
        final ProducerRecord<String, String> record = createRecord(data);
    
        ListenableFuture<SendResult<Integer, String>> future = template.send(record);
        future.addCallback(new KafkaSendCallback<SendResult<Integer, String>>() {
    
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(data);
            }
    
            @Override
            public void onFailure(KafkaProducerException ex) {
                handleFailure(data, record, ex);
            }
    
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 阻塞发布示例
    public void sendToKafka(final MyOutputData data) {
        final ProducerRecord<String, String> record = createRecord(data);
    
        try {
            template.send(record).get(10, TimeUnit.SECONDS);
            handleSuccess(data);
        }
        catch (ExecutionException e) {
            handleFailure(data, record, e.getCause());
        }
        catch (TimeoutException | InterruptedException e) {
            handleFailure(data, record, e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    ExecutionException 在于 KafkaProducerException 属性 failedProducerRecord 中

    2.1.3、发布结果查看

    • 异步

    发布成功还是失败可以向侦听器注册回调以异步接收发送结果:

    ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
            future.addCallback(new KafkaSendCallback<Integer, String>() {
    
    @Override
    public void onSuccess(SendResult<Integer, String> result) {
            ...
            }
    
    @Override
    public void onFailure(KafkaProducerException ex) {
            ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
            ...
            }
    
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    或者使用 lambda:

    ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
    future.addCallback(result -> {
            ...
        }, (KafkaFailureCallback<Integer, String>) ex -> {
                ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
                ...
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 同步

    阻塞发送线程等待结果需要调用 future 的 get()方法,可以使用带超时的 get() 方法。

  • 相关阅读:
    2652. 倍数求和
    带你深入了解微信小程序【授权登录】
    Java while循环语句使用详解说明
    SQL学习(基础)
    3. Python 变量和赋值
    UNIAPP实战项目笔记32 购物车无商品默认样式
    国际短信平台哪家好?
    Sublime Text Mac/Win中文版:代码编辑器的卓越典范
    HJ23 删除字符串中出现次数最少的字符
    介绍 TensorFlow 的基本概念和使用场景
  • 原文地址:https://blog.csdn.net/qq_35241329/article/details/132762468