• Kafka生产者之分区


    一、分区好处

    (1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果;

    (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据;

    在这里插入图片描述


    二、分区策略

    在IDEA中全局查找(ctrl +n)ProducerRecord类,在类中可以看到如下构造方法:
    在这里插入图片描述


    (1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0

    (2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
    例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那
    么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

    (3)既没有partition值又没有key值的情况下Kafka采用Sticky Partition(黏性分区器)会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
    例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。



    三、自定义分区器

    如果研发人员可以根据企业需求,自己重新实现分区器;

    1)需求

    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区;

    2)实现步骤

    2.1)定义类实现 Partitioner 接口
    2.2)重写 partition()方法
    /**
    * 1. 实现接口 Partitioner
    * 2. 实现 3 个方法:partition,close,configure
    * 3. 编写 partition 方法,返回分区号
    */
    public class MyPartitioner implements Partitioner {
    /**
     * 返回信息对应的分区
     * @param topic 主题
     * @param key 消息的 key
     * @param keyBytes 消息的 key 序列化后的字节数组
     * @param value 消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return
     */
     @Override
     public int partition(String topic, Object key, byte[] 
    keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     // 获取消息
     String msgValue = value.toString();
     // 创建 partition
     int partition;
     // 判断消息是否包含 atguigu
     if (msgValue.contains("atguigu")){
     partition = 0;
     }else {
     partition = 1;
     }
     // 返回分区号
     return partition;
     }
     // 关闭资源
     @Override
     public void close() {
     }
     // 配置方法
     @Override
     public void configure(Map<String, ?> configs) {
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    2.3)使用分区器的方法,在生产者的配置中添加分区器参数
    // 添加自定义分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
    gu.kafka.producer.MyPartitioner");
    
    • 1
    • 2
    • 3
  • 相关阅读:
    鸿蒙HarmonyOS实战-Web组件(请求响应和页面调试)
    【数学】【矩阵】迹(Trace)及相关性质
    【Mybatis编程:插入和根据id删除相册数据】
    deequ 2.0.1 VerificationSuite 检测项执行过程简要分析
    【双指针-简单】977. 有序数组的平方
    【MLT】MLT多媒体框架生产消费架构解析(三)
    上万篇笔记总结丨小红书品牌高频投放的12种经典笔记形式
    【ROS入门】机器人导航(仿真)——导航实现
    “岗课赛证”融通的物联网综合实训室建设方案
    【PostgreSQL内核学习(十四)—— (PortalRunMulti 和 PortalRunUtility)】
  • 原文地址:https://blog.csdn.net/G823909/article/details/128054254