Kafka 系列 可靠性

深入理解Kafka

Posted by lichao modified on May 23, 2021

Kafka 多副本之间如何进行数据同步,尤其是在发生异常时候的处理机制又是什么?多副本间的数据一致性如何解决,基于的一致性协议又是什么?如何确保Kafka 的可靠性? Kafka 中的可靠性和可用性之间的关系又如何?

副本

副本(Replica)是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。在常见的分布式系统中,为了对外提供可用的服务,我们往往会对数据和服务进行副本处理。数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这是解决分布式系统数据丢失问题最有效的手段。另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。

Kafka 中一个分区中包含一个或多个副本,其中一个为 leader 副本,其余为 follower 副本,各个副本位于不同的 broker 节点中。只有 leader 副本对外提供服务, follower 副本只负责数据同步。

分区中的所有副本统称为 AR , 而 ISR 是指与 leader 副本保持同步状态的副本集合,当然 leader 副本本身也是这个集合中的一员。

LEO 标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO, ISR 中最小的 LEO 即为 HW,俗称高水位,消费者只能拉取到 HW 之前的消息。从生产者发出的一条消息首先会被写入分区的leader 副本,不过还需要等待ISR 集合中的所有follower 副本都同步完之后才能被认为已经提交, 之后才会更新分区的HW ,进而消费者可以消费到这条消息。

失效副本

当 follower 副本将 leader 副本 LEO(LogEndOffset)之前的日志全部同步时,则认为该 follower 副本已经追赶上 leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识。Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数 replica.lag.time.max.ms 指定的值。千万不要错误地认为 follower 副本只要拉取 leader 副本的数据就会更新 lastCaughtUpTimeMs。 试想一下,当 leader 副本中消息的流入速度大于 follower 副本中拉取的速度时,就算 follower 副本一直不断地拉取 leader 副本的消息也不能与 leader 副本同步。如果还将此 follower 副本置于 ISR 集合中,那么当 leader 副本下线而选取此 follower 副本为新的 leader 副本时就会造成消息的严重丢失。 Kafka 源码注释中说明了一般有两种情况会导致副本失效:

  • follower 副本进程卡住,在一段时间内根本没有向 leader 副本发起同步请求,比如频繁的 Full GC 。
  • follower 副本进程同步过慢,在一段时间内都无法追赶上 leader 副本,比如 1/0 开销过大。

追赶副本

随着 follower 副本不断与 leader 副本进行消息同步, follower 副本的 LEO 也会逐渐后移,并最终追赶上 leader 副本,此时该 follower 副本就有资格进入 ISR 集合。追赶上 leader 副本的判定准则是此副本的 LEO 是否不小于leader 副本的 HW ,注意这里并不是和 leader 副本的LEO 相比。

ISR 变更

Kafka 在启动的时候会开启两个与 ISR 相关的定时任务,名称分别为 “isr-expiration” 和 “isr-change-propagation”。

isr-expiration 任务会周期性地检测每个分区是否需要缩减其 ISR 集合。这个周期和replica.lag.time.max.ms参数有关,大小是这个参数值的一半, 默认值为 5000ms。当检测到ISR 集合中有失效副本时,就会收缩ISR 集合。

当 ISR 集合发生变更时会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation 任务会周期性(固定值为2500ms)地检查 isrChangeSet,如果发现 isrChangeSet 中有 ISR 集合的变更记录,那么它会在 ZooKeeper 的/isrchangenotification路径下创建一个以 isr_change_ 开头的持久顺序节点(比如 /isr_change_ notification/isrchange_0000000000 ),并将 isrChangeSet 中的信息保存到这个节点中。Kafka 控制器为/isrchangenotification 添加了一个 Watcher,当这个节点中有子节点发生变化时会触发 Watcher 的动作,以此通知控制器更新相关元数据信息并向它管理的 broker 节点发送更新元数据的请求,最后删除/isrchangenotification路径下已经处理过的节点。频繁地触发 Watcher 会影响 Kafka 控制器、ZooKeeper 甚至其他 broker 节点的性能。为了避免这种情况,Kafka 添加了限定条件,当检测到分区的 ISR 集合发生变化时,还需要检查以下两个条件:

  1. 上一次 ISR 集合发生变化距离现在已经超过 5s。
  2. 上一次写入 ZooKeeper 的时间距离现在已经超过 60s 。 满足以上两个条件之一才可以将ISR 集合的变化写入目标节点。

ISR 扩充之后同样会更新ZooKeeper 中的/brokers/topics/<topic>/partition/<parititon>/state 节点和isrChangeSet,之后的步骤就和ISR 收缩时的相同。

当ISR 集合发生增减时,或者 ISR 集合中任一副本的 LEO 发生变化时,都可能会影响整个分区的 HW。

副本同步

对于副本而言,还有两个概念:本地副本(Local Replica)和远程副本(Remote Replica), 本地副本是指对应的 Log 分配在当前的 broker 节点上,远程副本是指对应的 Log 分配在其他的 broker 节点上。在 Kafka 中,同一个分区的信息会存在多个 broker 节点上,并被其上的副本管理器所管理,这样在逻辑层面每个 broker 节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。 如下图所示,某个分区有 3 个副本分别位于 brokerO 、broker1 和 broker2 节点中,其中带阴影的方框表示本地副本。假设 brokerO 上的副本 1 为当前分区的 leader 副本,那么副本 2 和副本 3 就是 follower 副本,整个消息追加的过程可以概括如下:

  • 生产者客户端发送消息至 leader 副本(副本1) 中。
  • 消息被迫加到 leader 副本的本地日志,并且会更新日志的偏移量。
  • follower 副本(副本2 和副本3 )向leader 副本请求同步数据。在拉取的请求中会带有自身的 LEO 信息。
  • leader 副本所在的服务器读取本地日志,并更新对应拉取的 follower 副本的信息(请求中的 LEO 信息)。选取 follower 中 LEO 的最小值作为新的 HW。
  • leader 副本所在的服务器将拉取结果返回给 follower 副本。leader 副本返回给 follower 副本相应的消息,并且还带有自身的 HW 信息。
  • follower 副本收到 leader 副本返回的拉取结果,将消息追加到本地日志中。更新各自的LEO。follower 副本还会更新自己的 HW ,更新HW 的算法是比较当前 LEO 和 leader 副本中传送过来的 HW 的值, 取较小值作为自己的 HW 值。

分区副本

在一个分区中, leader 副本所在的节点会记录所有副本的LEO, 而 follower 副本所在的节点只会记录自身的 LEO ,而不会记录其他副本的 LEO 。对 HW 而言,各个副本所在的节点都只记录它自身的HW。

Leader Epoch 的介入

Kafka 从 0.11.0.0 开始引入了 leader epoch 的概念,在需要截断数据的时候使用 leader epoch 作为参考依据而不是原本的 HW 。leader epoch 代表 leader 的纪元信息(epoch),初始值为 0。每当 leader 变更一次,leader epoch 的值就会加 1,相当于为 leader 增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch =>StartOffset>,其中 StartOffset 表示当前 LeaderEpoch 下写入的第一条消息的偏移量。每个副本的 Log 下都有一个leader-epoch-checkpoint 文件,在发生 leader epoch 变更时,会将对应的矢量对追加到这个文件中。

为什么不支持读写分离

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从而实现的是一种主写主读的生产消费模型。 从“收益点”这个角度来做具体分析。主写从读可以让从节点去分担主节点的负载压力,预防主节点负载过重而从节点却空闲的情况发生。但是主写从读也有 2 个很明显的缺点:

  1. 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中A 数据的值都为x, 之后将主节点中A 的值修改为Y,那么在这个变更通知到从节点之前, 应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  2. 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘这几个阶段。对延时敏感的应用而言, 主写从读的功能并不太适用。

消费者模型

主读从写-可以均摊一定的负载却不能做到完全的负载均衡,比如对于数据写压力很大而读压力很小的情况,从节点只能分摊很少的负载压力,而绝大多数压力还是在主节点上。而在 Kafka 中却可以达到很大程度上的负载均衡,而且这种均衡是在主写主读的架构上实现的。如上图所示,在 Kafka 集群中有 3 个分区,每个分区有 3 个副本,正好均匀地分布在 3 个 broker 上,灰色阴影的代表 leader 副本,非灰色阴影的代表 follower 副本,虚线表示 follower 副本从 leader 副本上拉取消息。当生产者写入消息的时候都写入leader 副本,对于上图中的情形,每个 broker 都有消息从生产者流入;当消费者读取消息的时候也是从 leader 副本中读取的,对于上图中的情形,每个 broker 都有消息流出到消者。

每个 broker 上的读写负载都是一样的,这就说明 Kafka 可以通过主写主读实现主写从读实现不了的负载均衡。

在实际应用中,配合监控、告警、运维相结合的生态平台,在绝大多数情况下 Kafka 都能做到很大程度上的负载均衡。总的来说, Kafka 只支持主写主读有几个优点:可以简化代码的实现逻辑,减少出错的可能;将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。为此,Kafka 又何必再去实现对它而言毫无收益的主写从读的功能呢?这一切都得益于Kafka 优秀的架构设计,从某种意义上来说,主写从读是由于设计上的缺陷而形成的权宣之计。

日志同步机制

在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。虽然有许多方式可以实现这些功能,但最简单高效的方式还是从集群中选出一个leader 来负责处理数据写入的顺序性。只要 leader 还处于存活状态,那么 follower 只需按照 leader 中的写入顺序来进行同步即可。

通常情况下,只要 leader 不宕机我们就不需要关心 follower 的同步问题。不过当 leader 宕机时,我们就要从 follower 中选举出一个新的 leader 。follower 的同步状态可能落后 leader 很多,甚至还可能处于宕机状态,所以必须确保选择具有最新日志消息的 follower 作为新的leader 。日志同步机制的一个基本原则就是: 如果告知客户端已经成功提交了某条消息,那么即使 leader 宕机,也要保证新选举出来的leader 中能够包含这条消息。这里就有一个需要权衡( tradeoff) 的地方,如果leader 在消息被提交前需要等待更多的 follower 确认,那么在它宕机之后就可以有更多的 follower 替代它,不过这也会造成性能的下降。

对于这种 tradeoff, 一种常见的做法是“少数服从多数”,它可以用来负责提交决策和选举决策。虽然 Kafka 不采用这种方式,但可以拿来探讨和理解 tradeoff 的艺术。在这种方式下,如果我们有 2f+1 个副本,那么在提交之前必须保证有 f+1 个副本同步完消息。同时为了保证能正确选举出新的 leader,至少要保证有 f+1 个副本节点完成日志同步并从同步完成的副本中选举出新的 leader 节点。并且在不超过 f 个副本节点失败的情况下,新的 leader 需要保证不会丢失已经提交过的全部消息。这样在任意组合的 f+1 个副本中,理论上可以确保至少有一个副本能够包含已提交的全部消息,这个副本的日志拥有最全的消息,因此会有资格被选举为新的 leader 来对外提供服务。

“少数服从多数”的方式有一个很大的优势,系统的延迟取决于最快的几个节点,比如副本数为3, 那么延迟就取决于最快的那个 follower 而不是最慢的那个(除了 leader,只需要另一个 follower 确认即可)。不过它也有一些劣势,为了保证 leader 选举的正常进行,它所能容忍的失败 follower 数比较少,如果要容忍 1 个follower 失败,那么至少要有 3 个副本,如果要容忍 2 个follower 失败,必须要有 5 个副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这也就是“少数服从多数”的这种 Quorum 模型常被用作共享集群配置( 比如ZooKeeper ),而很少用于主流的数据存储中的原因。

与“少数服从多数”相关的一致性协议有很多, 比如Zab 、Raft 和Viewstamped Replication等。而Kafka 使用的更像是微软的PacificA 算法。

在 Kafka 中动态维护着一个 ISR 集合,处于 ISR 集合内的节点保持与 leader 相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable 配置为false)才有资格被选为新的 leader。 写入消息时只有等到所有ISR 集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为leader,选举过程简单、开销低,这也是 Kafka 选用此模型的重要因素。Kafka 中包含大量的分区, leader 副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响 Kafka 的性能指标。

在采用 ISR 模型和(f+1)个副本数的配置下,一个 Kafka 分区能够容忍最大 f 个节点失败,相比于“少数服从多数”的方式所需的节点数大幅减少。实际上,为了能够容忍 f 个节点失败,“少数服从多数”的方式和ISR 的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍 1 个节点失败,“少数服从多数” 需要 3 个副本和 1 个 follower 的确认信息, 采用 ISR 的方式需要 2 个副本和 1 个 follower 的确认信息。在需要相同确认信息数的情况下, 采用 ISR 的方式所需要的副本总数变少,复制带来的集群开销也就更低, “少数服从多数”的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对 Kafka 而言,这种能力可以交由客户端自己去选择。

min.insync.replicas 这个参数指定了 ISR 集合中最小的副本数,如果不满足条件就会抛出 NotEnoughReplicasException 或 NotEnoughReplicasAfterAppendException。在正常的配置下,需要满足副本数>min.insync.replicas 参数的值。一个典型的配置方案为:副本数配置为 3, min.insync.replicas 参数值配置为 2, 注意 min.insync.replicas 参数在提升可靠性的时候会从侧面影响可用性。试想如果 ISR 中只有一个 leader 副本,那么最起码还可以使用,而此时如果配置 min.insync.replicas> 1 ,则会使消息无法写入。

另外,一般的同步策略依赖于稳定的存储系统来做数据恢复,也就是说,在数据恢复时日志文件不可丢失且不能有数据上的冲突。不过它们忽视了两个问题: 首先, 磁盘故障是会经常 发生的,在持久化数据的过程中并不能完全保证数据的完整性;其次,即使不存在硬件级别的故障,我们也不希望在每次写入数据时执行同步刷盘( fsync )的动作来保证数据的完整性,这样会极大地影响性能。而 Kafka 不需要岩机节点必须从本地数据日志、中进行恢复, Kafka 的同步方式允许宿机副本重新加入 ISR 集合,但在进入ISR 之前必须保证自己能够重新同步完 leader 中的所有数据。

可靠性分析

就 Kafka 而言,越多的副本数越能够保证数据的可靠性,副本数可以在创建主题时配置,也可以在后期修改,不过副本数越多也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。一般而言,设置副本数为 3 即可满足绝大多数场景对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用 Kafka 时就会设置副本数为 5。与此同时,如果能够在分配分区副本的时候引入机架信息(broker.rack参数),那么还要应对机架整体岩机的风险。

仅依靠副本数来支撑可靠性是远远不够的,大多数人还会想到生产者客户端参数 acks,相比于 0 和 1,acks = -1 (客户端还可以配置为all ,它的含义与 -1 一样,以下只以 -1 来进行陈述)可以最大程度地提高消息的可靠性。

  • 对于 acks = 1 的配置,生产者将消息发送到 leader 副本, leader 副本在成功写入本地日志之后会告知生产者己经成功提交。如果此时 ISR 集合的 follower 副本还没来得及拉取到 leader 中新写入的消息, leader 就宕机了,那么此次发送的消息就会丢失。
  • 对于 acks = -1 的配置,生产者将消息发送到 leader 副本, leader 副本在成功写入本地日志之后还要等待 ISR 中的 follower 副本全部同步完成才能够告知生产者已经成功提交,即使此时 leader 副本宕机,消息也不会丢失。同样对于acks = -1 的配置,如果在消息成功写入leader 副本之后,并且在被 ISR 中的所有副本同步之前 leader 副本宕机了,那么生产者会收到异常以此告知此次发送失败。

min.insync.replicas 默认为 1 时,如果所有的 follower 副本都被剔除出了 ISR 集合,那么 ISR 中只有一个 leader 副本,最终 acks=-1 演变为 acks = 1 的情形,如此也就加大了消息丢失的风险。

与可靠性和ISR 集合有关的还有一个参数 unclean.leader.election.enable。这个参数的默认值为 false ,如果设置为 true 就意味着当 leader 下线时候可以从非 ISR 集合中选举出新的 leader ,这样有可能造成数据的丢失。如果这个参数设置为 false,那么也会影响可用性,非ISR 集合中的副本虽然没能及时同步所有的消息,但最起码还是存活的可用副本。随着 Kafka 版本的变更,有的参数被淘汰,也有新的参数加入进来,而传承下来的参数一般都很少会修改既定的默认值,而unclean.leader.election.enable就是这样一个反例,从 0.11.0.0 版本开始, unclean.leader.election.enable的默认值由原来的 true 改为了 false ,可以看出 Kafka 的设计者愈发地偏向于可靠性的提升。

在 broker 端还有两个参数log.flush.interval.messageslog.flush.interval.ms,用来调整同步刷盘策略,默认是不做控制而交由操作系统本身来进行处理。同步刷盘是增强一个组件可靠性的有效方式, Kafka 也不例外。但绝大多数情景下,一个组件(尤其是大数据量的组件)的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而应该采用多副本的机制来保障。

enable.auto.commit 参数的默认值为 true,即开启自动位移提交的功能,虽然这种方式非常简便,但它会带来重复消费和消息丢失的问题,对于高可靠性要求的应用来说显然不可取,所以需要将 enable.auto.commit 参数设置为 false 来执行手动位移提交。在执行手动位移提交的时候也要遵循一个原则:如果消息没有被成功消费,那么就不能提交所对应的消费位移。对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。有时候,由于应用解析消息的异常,可能导致部分消息一直不能够成功被消费,那么这个时候为了不影响整体消费的进度,可以将这类消息暂存到死信队列中,以便后续的故障排除。