Kafka 系列 存储架构

深入理解Kafka

Posted by lichao modified on September 10, 2020

日志文件

不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止Log过大,Kafka 又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。

事实上,Log和LogSegnient也不是纯粹物理意义上的概念, Log在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。

向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。为了方便描述,我们将最后一个LogSegment 称为“activeSegment”,即表示当前活跃的日志分段。

随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新activeSegment,之后追加的消息将写入新的activeSegment。为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index ”为文件后缀)和时间戳索引文件(以“.timeindex ”为文件后缀)。

每个LogSegment都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset 。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.logo

索引文件

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率:

  • 偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置
  • 时间戳索引文件则根据指定的时间戳( timestamp )来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数log.index.interval.bytes 指定,默认值为 4096 ,即 4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

日志分段文件切分包含以下几个条件,满足其一即可:

  1. 当前日志分段文件的大小超过了broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为1073741824 ,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.mslog.roll.hours 参数配置的值。如果同时配置了log.roll.mslog.roll.hours 参数,那么log.roll.ms 的优先级高。默认情况下,只配置了log.roll.hours 参数,其值为 168,即7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默认值为 10485760 ,即 1OMB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset-baseOffset>Integer.MAX_VALUE)。

对非当前活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读。而对当前活跃的日志分段(activeSegment)而言,索引文件还会追加更多的索引项,所以被设定为可读写。在索引文件切分的时候, Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,索引文件的大小由 broker 端参数 log.index.size.max.bytes 配置。Kafka 在创建索引文件的时候会为其预分配log.index.size.max.bytes 大小的空间,注意这一点与日志分段文件不同,只有当索引文件进行切分的时候, Kafka 才会把该索引文件裁剪到实际的数据大小。也就是说,与当前活跃的日志分段对应的索引文件的大小固定为 log.index.size.max.bytes ,而其余日志分段对应的索引文件的大小为实际的占用空间。

日志清理

Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个 Log ,而 Log 又可以分为多个日志分段,这样也便于日志的清理操作。Kafka 提供了两种日志清理策略。

  1. 日志删除(Log Retention): 按照一定的保留策略直接删除不符合条件的日志分段。
  2. 日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。

可以通过 broker 端参数 log.cleanup.policy 来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将 log.cleanup.policy 设置为“compact”,并且还需要将 log.cleaner.enable (默认值为true)设定为 true。 通过将log.cleanup.policy 参数设置为“delete,compact ”,还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到主题级别,比如与 log.cleanup.policy 对应的主题级别的参数为 cleanup.policy

日志删除

在 Kafka 的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms来配置,默认值为 300000 ,即 5 分钟。当前日志分段的保留策略有3 种:

  • 基于时间的保留策略:日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentioMS)来寻找可删除的日志分段文件集合(deletableSegments),默认情况下日志分段文件的保留时间为 7 天。
  • 基于日志大小的保留策略:日志删除任务会检查当前日志的大小是否超过设定的阔值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。
  • 基于日志起始偏移量的保留策略:基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量 baseOffset 是否小于等于logStartOffset,若是则可以删除此日志分段。

日志压缩

Kafka 中的 Log Compaction 是指在默认的日志删除(Log Retention)规则之外提供的一种清理过时数据的方式。Log Compaction 对于有相同 key 的不同value 值,只保留最后一个版本。如果应用只关心key 对应的最新value 值,则可以开启Kafka 的日志清理功能,Kafka 会定期将相同key 的消息进行合井,只保留最新的value 值。

磁盘存储

Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消 息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作。

有关测试结果表明,一个由 6 块 7200r/min 的 RAID-5 阵列组成的磁盘簇的线性(顺序)写入速度可以达到 600MB/s,而随机写入速度只有 1OOKB/s,两者性能相差 6000 倍。操作系统可以针对线性读写做深层次的优化,比如预读( read-ahead ,提前将一个比较大的磁盘块读入内存)和后写( write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快。

页缓存

Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.messageslog.flush.interval.ms 等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过笔者并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。 参见 IO 系列 操作系统级IO

零拷贝

Kafka 还使用零拷贝( Zero-Copy )技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换 IO 系列 零拷贝