• Kafka(三)生产者发送消息


    生产者发送思路

    如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

    1. ack使用默认的all
    2. 开启重试
    3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
    4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
    5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

    自定义序列化类

    消息格式为JSON, 使用Jackson将类序列化为JSON字符串

    public class UserDTOSerializer implements Serializer<UserDTO> {
        
        @Override
        @SneakyThrows
        public byte[] serialize(final String s, final UserDTO userDTO) {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsBytes(userDTO);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    配置生产者参数

    有几点需要注意

    1. 开启压缩
    2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
    3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
    4. ack使用默认的all
        /**
         * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整
         * 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致
         * @return
         */
        public static Properties loadProducerConfig(String valueSerializer) {
            Properties result = new Properties();
            result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");
            result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
            result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
            // 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量
            // 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景
            result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);
            // 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小
            result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);
            // 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量
            result.put(ProducerConfig.LINGER_MS_CONFIG, 10);
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    提升吞吐量

    • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
    • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
    • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

    发送消息

    @Log
    public class MessageProducer {
        
        public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));
        
        private MessageFailedService messageFailedService = new MessageFailedService();
    
        /**
         * kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟
         * callback函数只有在最后一次重试之后才会调用, 如果你想在本地测试Kafka生产者的重试,详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm
         * @param userDTO
         */
        public void sendMessage(final UserDTO userDTO) {
            ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);
            try {
                PRODUCER.send( user, (recordMetadata, e) -> {
                    if (Objects.nonNull(e)) {
                        log.severe("message has sent failed");
                        MessageFailedEntity messageFailedEntity = new MessageFailedEntity();
                        messageFailedEntity.setMessageId(userDTO.getMessageId());
                        ObjectMapper mapper = new ObjectMapper();
                        try {
                            messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));
                        } catch (JsonProcessingException jsonProcessingException) {
                            log.severe("message content json format failed");
                        }
                        messageFailedEntity.setMessageType(MessageType.EMAIL);
                        messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);
                        messageFailedEntity.setFailedReason(e.getMessage());
                        // 如果sendMessage传进来的是个list,也同理,不能放到list.foreach外面
                        // 如果放在主线程里,由于kafka producer是异步的,
                        // kafka producer的执行速度可能慢于主线程,可能拿到的值是空的是有问题的,例如拿到的failedReason是空的
                        messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);
                    } else {
                        log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );
                    }
                });
            } catch (TimeoutException e) {
                log.info("send message to kafka timeout, message: ");
                // TODO: 自定义逻辑,比如发邮件通知kafka管理员
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    对上述代码做几点解释

    1. 我们使用异步的方式发送,如果发送成功,打印一条消息
    2. 关键在于重试,callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback, 这个问题我发邮件问过社区, 详情见这里的 邮件

    关闭生产者

    实现ServletContextListener接口, 然后在web.xml的listener元素中配置

    public class KafkaListener implements ServletContextListener {
    
        private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();
    
        @Override
        public void contextInitialized(ServletContextEvent sce) {
            KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);
        }
    
        @Override
        public void contextDestroyed(ServletContextEvent sce) {
            KAFKA_PRODUCERS.forEach(KafkaProducer::close);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    
    <web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
                          https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
             version="6.0">
    
      <listener>
        <listener-class>com.business.server.listener.KafkaListenerlistener-class>
      listener>
    web-app>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    结语

    1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
    2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
    3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

    示例源码仓库

    1. Github地址
    2. 项目下business-server module代表生产者
    3. 运行时IDEA配置如下在这里插入图片描述
      注意Application context的路径, 启动之后访问端口+Application context, 例如
    http://localhost:8999/business-server
    
    • 1

    下一篇博文,将介绍消费者消费消息,以及消费者的重要参数配置,还有消费逻辑的重试机制等。

  • 相关阅读:
    路由规则——MVC控制器
    @RequestMapping运用举例(有源码) 前后端如何传递参数?后端如何接收前端传过来的参数,传递单个参数,多个参数,对象,数组/集合(有源码)
    Maven
    Java并发编程第8讲——ThreadLocal详解
    为什么yolov8val的dfl_loss一直上升呢
    3Darray 修改array值然后保存图片
    牛客网专项练习30天Pytnon篇第01天
    neo4j 删除从一个node开始的所有数据。
    说一下CSS浮动
    大学英语四级考试核心高频词汇突破
  • 原文地址:https://blog.csdn.net/dghkgjlh/article/details/134360108