• 【Kafka源码分析】二、服务端Server


    一. 消息存储文件概念

    Kafka用Topic将数据划分成内聚性较强的子集,Topic内部又划分成多个Partition。不过这两个都是逻辑概念,真正存储文件的是Partition所对应的一个或多个Replica,即副本。在存储层有个概念和副本一一对应——Log。为了防止Log过大,增加消息过期和数据检索的成本,Log又会按一定大小划分成"段",即LogSegment。下图为这些概念间的关系:
    在这里插入图片描述
    在Server端创建Topic后,会为topic生成log目录,目录名为 Topic-PartitionIndex,如 lao-zhang-tou-topic-0。在这个目录下,会生成多个文件,如下:
    在这里插入图片描述
    该目录下,名称相同的.log文件、.index文件、.timeindex文件构成了一个LogSegment。例如图中的 00000000000000000000.log、00000000000000000000.index、00000000000000000000.timeindex 三个文件。其中.log是数据文件,用于存储消息数据;.index和.timeindex是在.log基础上建立起来的索引文件

    二. 消息存储文件内容格式

    .log文件:

    log文件将消息数据依次排开进行存储
    在这里插入图片描述
    每个Message内部分为"数据头"(LOG_OVERHEAD)和"数据体"(Record)两部分
    在这里插入图片描述其中,LOG_OVERHEAD包含两个字段:

    1. offset:每条数据的逻辑偏移量,按插入顺序分别为0、1、2… … N;每个消息的offset在Partition内部是唯一的;
    2. size:数据体(RECORD)部分的长度;

    RECORD内部格式如下:
    在这里插入图片描述
    • crc32:校验码,用于验证数据完整性;
    • magic:消息格式的版本号;v0=0,v1=1;本文讲v1格式;
    • timestamp:时间戳,具体业务含义依attributes的值而定;
    • attributes:属性值;
    • keyLength:key值的长度;
    • key:消息数据对应的key;
    • valueLength:value值的长度;
    • value:消息体,承载业务信息;

    .index和.timeindex文件

    .index文件是依offset建立其的稀疏索引,可减少通过offset查找消息时的遍历数据量。.index文件的每个索引条目占8 bytes,有两个字段:relativeOffset 和 position(各占4 bytes)。也就是消息逻辑偏移量offset到其在文件中物理偏移量的一个映射

    relativeOffset指的的相对偏移量,是对LogSegment基准offset而言的。我们注意到,一个LogSegment内的.log文件、.index文件、和.index文件除后缀外的名称都是相同的。其实这个名称就是该LogSegment的基准offset,即LogSegment内保存的第一条消息对应的offset。baseOffset + relativeOffset即可得到offset,所以称索引项是offset到物理偏移量的映射。

    不是所有的消息都对应.index文件内的一个条目。Kafka会每隔一定量的消息才会在.index建立索引条目,间隔大小由"log.index.interval.bytes"配置指定。.index文件布局示意图如下:
    在这里插入图片描述
    .timeindex文件和.index原理相同,只不过其IndexEntry的两个字段分别为timestamp(8 bytes)和relativeOffset(4 bytes)。用于减少以时间戳查找消息时遍历元素数量

    在这里插入图片描述

    三.日志文件清理策略

    Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间
    • log.retention.hours,最低优先级小时,默认 7天。
    • log.retention.minutes,分钟。
    • log.retention.ms,最高优先级毫秒。
    • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

    日志超过了设置的时间,有两种清理策略:

    • delete:过期数据删除(当segment文件中有一部分数据过期,一部分没有过期的时候,会以该文件中的最大时间戳作为过期时间,如果最大过期时间没有达到要清理的时间的话则不用清理,等待下次检查过期文件的来进行清理)
    • compact:对于相同key的不同value值,只保留最后一个版本。
      在这里插入图片描述
      压缩后的offset可能是不连续的,这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

    • 配置方式:

    log.cleanup.policy = delete 
    log.cleanup.policy = compact 
    
    • 1
    • 2

    四.高性能IO

    Kafka的吞吐量非常高,一个非常重要的原因就是它采用了高效的IO方法

    1. 顺序IO

    Kafka在顺序IO上的设计分两方面看:

    1. LogSegment创建时,一口气申请LogSegment最大size的磁盘空间,这样一个文件内部尽可能分布在一个连续的磁盘空间内
    2. .log文件也好,.index和.timeindex也罢,在设计上都是只追加写入,不做更新操作,这样避免了随机IO的场景;

    不要创建太多的Partition!!!:Kafka集群能承载的Partition数量有上限。很大一部分原因是Partition数量太多会抹杀掉Kafka顺序IO设计带来的优势,相当于自废武功。因为不同Partition在磁盘上的存储位置可不保证连续,当以不同Partition为读写目标并发地向Kafka发送请求时,Server端近似于随机IO。

    2. 零拷贝+PageCache页缓存
    • 零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
    • PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用
      在这里插入图片描述
      非零拷贝流程:生产者生产消息,将消息发送到kafka集群,接着kafka会将消息内容交给linux内核进行处理,linux内核会将数据缓存起来一份,放到页缓存中,最后由linux内核将数据存储到磁盘中,当消费者消费消息的时候,kafka会先去页缓存中找,页缓存如果找不到的话,会去磁盘中读取,返回消息信息,同时也会在页缓存中存储一份数据,当kafka向消费者发送数据的时候(有可能kafka和消费者不在一台服务器上,需要跨节点通信,跨节点通讯需要网卡), kafka会先将数据先发送到linux内核中的socket缓存中,再由socket缓存将数据通过网卡发送到消费者。

    在非零拷贝的过程中一共经历了4次拷贝数据的过程:

    1. 从磁盘文件到linux内核页缓存
    2. 从内核页缓存到kafka
    3. 接着从kafka到内核的socket缓存
    4. 再从socket缓存到网卡。

    从磁盘文件到linux内核,从socket缓存到网卡,是属于DMA拷贝(直接存储器访问),不经过cpu,消耗的资源和时间比较小
    从内核页缓存到kafka,接着从kafka到内核的socket缓存,是属于cpu拷贝,而且需要在用户态和内核态之间来回切换,会消耗大量的资源和时间。

    零拷贝时broker应用层不操作数据,所以不需要将数据拷贝到broker应用层,可以直接由页面缓存通过网卡将数据发送到消费者
    在零拷贝的过程中一共经历了2次拷贝数据的过程,从磁盘文件到linux内核页缓存,再从linux内核页缓存到网卡。
    从磁盘文件到linux内核页缓存,再从linux内核页缓存到网卡,不经过cpu,消耗的资源和时间比较小,而且避免了需要在用户态和内核态之间来回切换,从而节省了大量的资源和时间。

    3. 端到端压缩:

    一条压缩消息从生产者处发出后,其在消费者处才会被解压。Kafka Server端不会尝试解析消息体,直接原样存储,省掉了Server段压缩&解压缩的成本,这也是Kafka吞吐量高的原因之一。

    五.消息存储核心类

    在这里插入图片描述

    • 数据传递对象:Kafka消息存储的基本单位不是"一条消息",而是"一批消息"。Producer针对每个Partition会攒一批消息,经过压缩后发到Server端。Server端会将对应Partition下的这一"批"消息作为一个整体进行管理。所以在Server端,一个"Record"表示"一批消息",而数据传递对象"XXXRecords"则可以表示一批或多批消息。
      • MemoryRecords:内存消息数据。如Server端接到的生产者消息,未写入磁盘前暂存于内存
      • FileRecords:磁盘消息数据。如从磁盘读出消息返回给消费者的过程就用FileRecords来传递数据
    • ReplicaManager:负责管理本节点存储的所有Partition及其副本。核心属性allPartitions用于存储所有Partition对象,可根据TopicPartition类将其检索出来
    • Partition:即分区对象,管理该分区下的所有副本:
      • allReplicasMap:Pool[Int, Replica],key为BrokerId,value为Replica对象
      • leaderReplicaIdOpt:Leader副本所在节点的BrokerId
      • localBrokerId:本节点对应的BrokerId
    • Replica:负责维护Log对象。Replica是业务模型层面"副本"的表示,Log是数据存储层面的"副本"。
    • Log:负责维护副本下的所有LogSegment。核心属性segments:ConcurrentSkipListMap[java.lang.Long, LogSegment],key为对应LogSegment的起始offset
    • LogSegment:实际消息数据存储对象(默认大小为1G)。内部包含该日志段的索引offsetIndex、timeIndex
    • OffsetIndex和TimeIndex:对应上文提到的索引文件

    六、消息写入流程

    Server端的代码是用Scala写的,消息写入的主要流程如下
    在这里插入图片描述

    1. ReplicaManager.appendRecords:
      a. 检查目标Topic是否为Kafka内部Topic,若是的话根据配置决定是否允许写入;
      b. 获取对应的Partition对象;
      c. 调用Partition.appendRecordsToLeader写入消息数据
    2. Partition.appendRecordsToLeader
      a. 判断Leader副本是否在当前节点;(若本节点不是目标Partition的Leader副本, 抛异常)
      b. 获取Log对象;
      c. 调用Log对象的appendAsLeader方法写入数据;
    3. Log.appendAsLeader
      a. 判断是否需要创建一个新的LogSegment,并返回最新的LogSegment;
      b. 调用LogSegment.append方法写入数据;
    4. LogSegment.append
      a. 数据校验
      b. 校验输入消息大小;
      c. 校验offset;
      d. 写入数据(注意: 此步的log对象不是Log类的实例,而是FileRecords的实例);
      e. 更新统计数据;
      f. 处理索引;
    def append(largestOffset: Long,
                 largestTimestamp: Long,
                 shallowOffsetOfMaxTimestamp: Long,
                 records: MemoryRecords): Unit = {
        // step1.1 判断输入消息大小
        if (records.sizeInBytes > 0) {
          trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
                s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
          // step1.2 校验offset
          val physicalPosition = log.sizeInBytes()
          if (physicalPosition == 0)
            rollingBasedTimestamp = Some(largestTimestamp)
    
          ensureOffsetInRange(largestOffset)
    
          // step2 append the messages,这里的log是FileRecords的实例
          val appendedBytes = log.append(records)
          trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
          // step3 Update the in memory max timestamp and corresponding offset.
          if (largestTimestamp > maxTimestampSoFar) {
            maxTimestampSoFar = largestTimestamp
            offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
          }
          // step4 append an entry to the index (if needed)
          if (bytesSinceLastIndexEntry > indexIntervalBytes) {
            offsetIndex.append(largestOffset, physicalPosition)
            timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
            bytesSinceLastIndexEntry = 0
          }
          bytesSinceLastIndexEntry += records.sizeInBytes
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    参考文献:https://zhuanlan.zhihu.com/p/371361083
    https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343

  • 相关阅读:
    Js逆向教程-16极验滑块 找到w加密位置
    【入门必看】如何快速学一门新语言?
    深入理解mysql执行的底层机制
    【CSP-J/S初赛知识点整理】
    【JVM笔记】方法区的内部结构与运行时常量池
    数据挖掘技术-转换字符串时间为标准时间
    sql server 查询某个字段是否有值 返回bool类型
    Elasticsearch 认证模拟题 - 6
    【软考-中级】系统集成项目管理工程师 【15 信息 (文档) 和配置管理】
    no identity-based policy allows the cloudformation:CreateStack action
  • 原文地址:https://blog.csdn.net/wanger61/article/details/126056167