在 Kafka 集群中会有一个或多个 broker ,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。
- 当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。
- 当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。
- 当使用
kafka-topics.sh
脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
控制器在选举成功之后会读取 ZooKeeper 中各个节点的数据来初始化上下文信息(ControllerContext),并且需要管理这些上下文信息。比如为某个主题增加了若干分区,控制器在负责创建这些分区的同时要更新上下文信息,并且需要将这些变更信息同步到其他普通的 broker 节点中。不管是监听器触发的事件,还是定时任务触发的事件,或者是其他事件( 比如ControlledShutdown)都会读取或更新控制器中的上下文信息, 那么这样就会涉及多线程间的同步。如果单纯使用锁机制来实现, 那么整体的性能会大打折扣。针对这一现象, Kafka 的控制器使用单线程基于事件队列的模型, 将每个事件都做一层封装, 然后按照事件发生的先后顺序暂存到 LinkedBlockingQueue 中,最后使用一个专用的线程(ControllerEventThread)按照 FIFO ( First Input First Output,先入先出)的原则顺序处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全。
分区 leader 选举
分区 leader 副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的 leader 副本下线,此时分区需要选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为 OfflinePartitionLeaderElectionStrategy 。这种策略的基本思路是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中。一个分区的AR 集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变。
分区中的所有副本统称为 AR , 而 ISR 是指与 leader 副本保持同步状态的副本集合,当然 leader 副本本身也是这个集合中的一员。
当分区进行重分配的时候也需要执行 leader 的选举动作,对应的选举策略为 ReassignPartitionLeaderElectionStrategy。这个选举策略的思路比较简单:从重分配的 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中。
如果 ISR 集合中没有可用的副本, 那么此时还要再检查一下所配置的
unclean.leader.election.enable
参数(默认值为false)。如果这个参数配置为true ,那么表示允许从非 ISR 列表中的选举 leader,从 AR 列表中找到第一个存活的副本即为 leader 。
当发生优先副本的选举时,直接将优先副本设置为 leader 即可, AR 集合中的第一个副本即为优先副本(PreferredReplicaPartitionLeaderElectionStrategy)。
还有一种情况会发生 leader 的选举,当某节点被优雅地关闭(也就是执行 ControlledShutdown)时,位于这个节点上的 leader 副本都会下线,所以与此对应的分区需要执行 leader 的选举。与此对应的选举策略( ControlledShutdownPartitionLeaderElectionStrategy )为:从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。