Kafka 系列 消费者(Java)

深入理解Kafka

Posted by lichao modified on May 24, 2021

本文主要介绍目前流行的新消费者(Java 语言编写的)客户端。

一个正常的消费逻辑需要具备以下几个步骤:

  1. 配置消费者客户端参数及创建相应的消费者实例。
  2. 订阅主题。
  3. 拉取消息并消费。
  4. 提交消费位移。
  5. 关闭消费者实例。

订阅主题

Kafka 同一个消费组可以同时订阅多个主题。

拉取消息

Kafka 中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

提交位移

对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题 consumer offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交’,消费者在消费完消息之后需要执行消费位移的提交。 消费位移 消费位移 x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了x 位置的消息,那么就可以说消费者的消费位移为 x ,图中也用了 lastConsumedOffset 这个单词来标识它。 当前消费者需要提交的消费位移是下一条需要拉取的消息的位置,对应于图中的 position

在消费者中还有一个 committed offset 的概念,它表示已经提交过的消费位移。

自动位移提交

在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。自动位移提交也无法做到精确的位移管理。自动提交消费位移的方式会带来重复消费和消息丢失的问题,出现消息丢失的原因是自动位移提交不会判断消息是否被业务处理,可以理解为拉取\提交消息线程和业务处理线程独立运行。

手动位移提交

如果业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。手动提交就避免消息丢失,不能避免消息重复。

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和 commitAsync()两种类型的方法。

同步提交

同步提交有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩渍,那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。

1
2
3
4
5
6
7
whileisRunning.get()) {
  ConsumerRecords<String, String> records= consumer.poll(1000);
  for (ConsumerRecord<String, String> record : records) {
    //do some logical processing.
  }
  consumer.commitSync();
}

commitSync()方法会根据 poll()方法拉取的最新位移来进行提交。只要没有发生不可恢复的错误( Unrecoverable Eηor ),它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如 CommitFailedException 、WakeupException 、InterruptException 、AuthenticationException 、AuthorizationException 等,可以将其捕获并做针对性的处理。

对于采用 commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法,具体定义如下:

1
public void commitSync(final Map<TopicPartition , OffsetAndMetadata> offsets)

该方法提供了一个 offsets 参数,用来提交指定分区的位移。无参的 commitSync()方法只能提交当前批次对应的 position 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式。

异步提交

在一般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

默认位移

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为latest,表示从分区末尾开始消费消息。如果将 auto.offset.reset 参数配置为 earliest,那么消费者会从起始处,也就是 0 开始消费。

auto.offset.reset 参数还有一个可配置的值none,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException 异常。

除了查找不到消费位移,位移越界也会触发 auto.offset.reset 参数的执行

__consumer_offsets 剖析

位移提交的内容最终会保存到 Kafka 的内部主题 __consumer_offsets 中。一般情况下,当集群中第一次有消费者消费消息时会自动创建主题 __consumer_offsets ,不过它的副本因子还受 offsets.topic.replication.factor参数的约束,这个参数的默认值为 3。分区数可以通过 offsets.topic.num.partitions 参数设置,默认为 50。 消息中的重要参数:

  • retentiontime 表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为 -1。也就是说,按照 broker 端的配置 offsets.topic.num.partitions 来确定保留时长。offsets.topic.num.partitions 的默认值为10080 ,即 7 天,超过这个时间后消费位移的信息就会被删除(使用墓碑消息和日志压缩策略)。

__consumer_offsets消息格式 上图中展示了消费位移对应的消息内容格式,上面是消息的 key,下面是消息的 value。 可以看到 key 和 value 中都包含了 version 宇段,这个用来标识具体的 key 和 value 的版本信息,不同的版本对应的内容格式可能并不相同。就目前版本而言, key 和 value 的 version 值都为 1 。key 中除了 version 字段还有 group 、topic 、partition 字段,分别表示消费组的 groupId 、主题名称和分区编号。虽然 key 中包含了 4 个字段,但最终确定这条消息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的 GroupCoordinator 处于同一个broker 节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的。

value 中包含了 5 个字段,除 version 字段外,其余的 offset 、metadata 、commit timestamp、expire timestamp 字段分别表示消费位移、自定义的元数据信息、位移提交到 Kafka 的时间戳、消费位移被判定为超时的时间戳。

再平衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。

不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。

分区分配策略

Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为org.apache.kafka. clients.consumer.RangeAssignor,即采用 RangeAssignor 分配策略。除此之外, Kafka 还提供了另外两种分配策略: RoundRobinAssignor 和StickyAssignor 。

  • RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor 策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
  • RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobinAssignor 分配策略对应的 partition.assignment.strategy 参数值为org.apache.kafka.clients.consumer.RoundRobinAssignor
  • StickyAssignor 分配策略主要有两个目的: 1. 分区分配要尽可能均匀。2. 分区的分配尽可能与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。该分配策略具备一定的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

RangeAssignor 没有统一考录用一个消费组内的所有 topic,当同一个消费组内多个 topic 分配不均聚集的时候,则有可能出现部分消费者过载的情况。

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor 分配策略的分区分配会是均匀的。如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。如果某个消费者没有订阅消费组内的某个主题,那么在分配分区的时候此消费者将分配不到这个主题的任何分区。

消费组控住器

如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器( ConsumerCoordinator )和组协调器(GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。

在新版的消费者客户端中,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个 GroupCoordinator 对其进行管理, GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。

ConsumerCoordinator 与 GroupCoordinator 之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。

具体查找 Group Coordinator 的方式是先根据消费组 groupid 的哈希值计算 _consumer_offsets 主题中的分区编号。

1
Utils.abs(groupid.hashCode  % groupMetadataTopicPartitionCount

其中 groupid.hashCode 就是使用 Java 中 String 类的 hashCode()方法获得的, groupMetadataTopicPartitionCount 为主题 _consumer_offsets 的分区个数,这个可以 通过 broker 端参数 offsets.topic.num.partitions 来配置,默认值为 50。

找到对应的 consumer offsets 中的分区之后,再寻找此分区 leader 副本所在的 broker 节点,该 broker 节点即为这个 groupId 所对应的 GroupCoordinator 节点。消费者 groupId 最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区 leader 副本所在的 broker 节点,让此 broker 节点既扮演 GroupCoordinator 的角色,又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

再均衡原理

有如下几种情形会触发再均衡的操作:

  • 有新的消费者加入消费组。
  • 有消费者若机下线。消费者并不一定需要真正下线,例如遇到长时间的GC 、网络延迟导致消费者长时间未向 Group Coordinator 发送心跳等情况时,GroupCoordinator 会认为消费者己经下线。
  • 有消费者主动退出消费组(发送LeaveGroupRequest 请求)。比如客户端调用了 unsubscrible()方法取消对某些主题的订阅。
  • 消费组所对应的 GroupCoorinator 节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。

  1. 第一阶段(FIND_COORDINATOR): 消费者需要确定它所属的消费组对应的 GroupCoordinator 所在的 broker,并创建与该 broker 相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator,这里的“某个节点” 并非是集群中的任意节点,而是负载最小的节点,即 leastLoadedNode。
  2. 第二阶段(JOIN GROUP): 在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 Group Coordinator 发送 JoinGroupRequest 请求,并处理响应。
  3. 第三阶段(SYNC GROUP): leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时 leader 消费者并不是直接和其余的普通消费者同步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向 GroupCoordinator 发送 SyncGroupRequest 请求来同步分配方案,只有 leader 消费者发送的 SyncGroupRequest 请求中才包含具体的分区分配方案。
  4. 第四阶段(HEARTBEAT): 消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期, GroupCoordinator 也会认为这个消费者已经死亡,就会触发一次再均衡行为。消费者的心跳间隔时间由参数 heartbeat.interval.ms指定,默认值为 3000 ,即 3 秒, 这个参数必须比 session.timeout.ms 参数设定的值要小,一般情况下 heartbeat.interval.ms 的配置值不能超过 session.timeout.ms 配置值的1/3 。这个参数可以调整得更低,以控制正常重新平衡的预期时间。如果一个消费者发生崩溃,并停止读取消息,那么 GroupCoordinator 会等待一小段时间,确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间由 session.timeout.ms 参数控制,该参数的配置值必须在 broker 端参数 group.min.session.timeout.ms (默认值为6000 ,即 6 秒)和 group.max.session.timeout.ms (默认值为300000 ,即 5 分钟)允许的范围内。

进入第四阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了 GroupCoordinator , 并且 GroupCoordinator 将其保存到了 Kafka 内部的_consumer_offsets 主题中,此时消费者可以通过 OffsetFetchRequest 请求获取上次提交的消费位移并从此处继续消费。

还有一个参数 max.poll.interval.ms ,它用来指定使用消费者组管理时poll()方法调用之间的最大延迟,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用, 则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

线程模型

消费者线程模型 通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,最常见的方式:线程封闭,即为每个线程实例化一个 KafkaConsumer 对象(KafkaConsumer 却是非线程安全的)。一般而言,分区是消费线程的最小划分单位。 一个线程对应一个 KafkaConsumer 实例,可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。

这种多线程的实现方式和开启多个消费进程的方式没有本质上的区别,它的优点是每个线程可以按顺序消费各个分区中的消息。缺点也很明显,每个消费线程都要维护一个独立的 TCP 连接, 如果分区数和consumerThreadNum 的值都很大,那么会造成不小的系统开销。

第二种线程模型

第二种方式是多个消费线程同时消费同一个分区,这个通过assign()seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少。

第三种线程模型

对于方法一而言,如果对消息的处理非常迅速,那么 poll()拉取的频次也会更高,进而整体消费的性能也会提升。相反,如果对消息的处理缓慢,比如进行一个事务性操作,或者等待一个 RPC 的同步响应,那么poll()拉取的频次也会随之下降,进而造成整体消费性能的下降。一般而言,poll() 拉取消息的速度是相当快的,而整体消费的瓶颈也正是在处理消息这一块,如果我们通过-定的方式来改进这一部分,那么我们就能带动整体消费性能的提升。考虑第三种实现方式,将处理消息模块改成多线程的实现方式。 第三种消费者线程模型 第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP 连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。

每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量 offsets 中, KafraConsumerThread 在每一次 poll()方法之后都读取 offsets 中的内容并对其进行位移提交。注意在实现的过程中对 offsets 读写需要加锁处理,防止出现并发问题。井且在写入 offsets 的时候需要注意位移覆盖的问题。

其实这种位移提交的方式会有数据丢失的风险。对于同一个分区中的消息,假设一个处理线程 RecordHandler1 正在处理 offset 为 0~99 的消息,而另一个处理线程 RecordHandler2 已经处理完了offset 为 100~199 的消息并进行了位移提交,此时如果 RecordHandler1 发生异常,则之后的消费只能从 200 开始而无法再次消费 0~99 的消息,从而造成了消息丢失的现象。

第四种线程模型

基于滑动窗口实现,这里的滑动窗口式的实现方式是将拉取到的消息暂存起来,多个消费线程可以拉取暂存的消息,这个用于暂存消息的缓存大小即为滑动窗口的大小,总体上而言没有太多的变化,不同的是对于消费位移的把控。

第四种消费者线程模型

每一个方格代表一个批次的消息,一个滑动窗口包含若干方格, startOffset 标注的是当前滑动窗口的起始位置, endOffset 标注的是末尾位置。每当 startOffset 指向的方格中的消息被消费完成,就可以提交这部分的位移,与此同时,窗口向前滑动一格,删除原来 startOffset 所指方格中对应的消息,并且拉取新的消息进入窗口。滑动窗口的大小固定,所对应的用来暂存消息的缓存大小也就固定了,这部分内存开销可控。方格大小和滑动窗口的大小同时决定了消费线程的并发数: 一个方格对应一个消费线程,对于窗口大小固定的情况,方格越小并行度越高:对于方格大小固定的情况,窗口越大并行度越高。不过,若窗口设置得过大,不仅会增大内存的开销,而且在发生异常(比如Crash )的情况下也会引起大量的重复消费,同时还考虑线程切换的开销, 建议根据实际情况设置一个合理的值,不管是对于方格还是窗口而言,过大或过小都不合适。

如果一个方格内的消息无法被标记为消费完成,那么就会造成startOffset 的悬停。为了使窗口能够继续向前滑动, 那么就需要设定一个闹值,当startOffset 悬停一定的时间后就对这部分消息进行本地重试消费,如果重试失败就转入重试队列,如果还不奏效就转入死信队列。