Kafka用Topic将数据划分成内聚性较强的子集,Topic内部又划分成多个Partition。不过这两个都是逻辑概念,真正存储文件的是Partition所对应的一个或多个Replica,即副本。在存储层有个概念和副本一一对应——Log。为了防止Log过大,增加消息过期和数据检索的成本,Log又会按一定大小划分成"段",即LogSegment。下图为这些概念间的关系:

在Server端创建Topic后,会为topic生成log目录,目录名为 Topic-PartitionIndex,如 lao-zhang-tou-topic-0。在这个目录下,会生成多个文件,如下:

该目录下,名称相同的.log文件、.index文件、.timeindex文件构成了一个LogSegment。例如图中的 00000000000000000000.log、00000000000000000000.index、00000000000000000000.timeindex 三个文件。其中.log是数据文件,用于存储消息数据;.index和.timeindex是在.log基础上建立起来的索引文件。
log文件将消息数据依次排开进行存储

每个Message内部分为"数据头"(LOG_OVERHEAD)和"数据体"(Record)两部分
其中,LOG_OVERHEAD包含两个字段:
RECORD内部格式如下:

• crc32:校验码,用于验证数据完整性;
• magic:消息格式的版本号;v0=0,v1=1;本文讲v1格式;
• timestamp:时间戳,具体业务含义依attributes的值而定;
• attributes:属性值;
• keyLength:key值的长度;
• key:消息数据对应的key;
• valueLength:value值的长度;
• value:消息体,承载业务信息;
.index文件是依offset建立其的稀疏索引,可减少通过offset查找消息时的遍历数据量。.index文件的每个索引条目占8 bytes,有两个字段:relativeOffset 和 position(各占4 bytes)。也就是消息逻辑偏移量offset到其在文件中物理偏移量的一个映射。
relativeOffset指的的相对偏移量,是对LogSegment基准offset而言的。我们注意到,一个LogSegment内的.log文件、.index文件、和.index文件除后缀外的名称都是相同的。其实这个名称就是该LogSegment的基准offset,即LogSegment内保存的第一条消息对应的offset。baseOffset + relativeOffset即可得到offset,所以称索引项是offset到物理偏移量的映射。
不是所有的消息都对应.index文件内的一个条目。Kafka会每隔一定量的消息才会在.index建立索引条目,间隔大小由"log.index.interval.bytes"配置指定。.index文件布局示意图如下:

.timeindex文件和.index原理相同,只不过其IndexEntry的两个字段分别为timestamp(8 bytes)和relativeOffset(4 bytes)。用于减少以时间戳查找消息时遍历元素数量。

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间
• log.retention.hours,最低优先级小时,默认 7天。
• log.retention.minutes,分钟。
• log.retention.ms,最高优先级毫秒。
• log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
日志超过了设置的时间,有两种清理策略:

• 配置方式:
log.cleanup.policy = delete
log.cleanup.policy = compact
Kafka的吞吐量非常高,一个非常重要的原因就是它采用了高效的IO方法
Kafka在顺序IO上的设计分两方面看:
不要创建太多的Partition!!!:Kafka集群能承载的Partition数量有上限。很大一部分原因是Partition数量太多会抹杀掉Kafka顺序IO设计带来的优势,相当于自废武功。因为不同Partition在磁盘上的存储位置可不保证连续,当以不同Partition为读写目标并发地向Kafka发送请求时,Server端近似于随机IO。

在非零拷贝的过程中一共经历了4次拷贝数据的过程:
从磁盘文件到linux内核,从socket缓存到网卡,是属于DMA拷贝(直接存储器访问),不经过cpu,消耗的资源和时间比较小
从内核页缓存到kafka,接着从kafka到内核的socket缓存,是属于cpu拷贝,而且需要在用户态和内核态之间来回切换,会消耗大量的资源和时间。
零拷贝时broker应用层不操作数据,所以不需要将数据拷贝到broker应用层,可以直接由页面缓存通过网卡将数据发送到消费者。
在零拷贝的过程中一共经历了2次拷贝数据的过程,从磁盘文件到linux内核页缓存,再从linux内核页缓存到网卡。
从磁盘文件到linux内核页缓存,再从linux内核页缓存到网卡,不经过cpu,消耗的资源和时间比较小,而且避免了需要在用户态和内核态之间来回切换,从而节省了大量的资源和时间。
一条压缩消息从生产者处发出后,其在消费者处才会被解压。Kafka Server端不会尝试解析消息体,直接原样存储,省掉了Server段压缩&解压缩的成本,这也是Kafka吞吐量高的原因之一。

Server端的代码是用Scala写的,消息写入的主要流程如下

def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
// step1.1 判断输入消息大小
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
// step1.2 校验offset
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// step2 append the messages,这里的log是FileRecords的实例
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// step3 Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
// step4 append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
参考文献:https://zhuanlan.zhihu.com/p/371361083
https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343