在Kafka中有多种延时操作,比如延时生产(DelayedProduce)、延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。就延时生产操作而言,它的外部事件是所要写入消息的某个分区的HW (高水位)发生增长。也就是说,随着 follower 副本不断地与 leader 副本进行消息同步,进而促使 HW 进一步增长, HW 每增长一次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。
延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。提及时间轮的轮转是靠“收割机”线程 ExpiredOperationReaper 来驱动的,这里的“收割机”线程就是由延时操作管理器启动的。也就是说,定时器、“收割机”线程和延时操作管理器都是一一对应的。延时操作需要支持外部事件的触发,所以还要配备一个监听池来负责监听每个分区的外部事件一一查看是否有分区的 HW 发生了增长。另外需要补充的是, ExpiredOperationReaper 不仅可以推进时间轮,还会定期清理监听池中已完成的延时操作。
上图描绘了客户端在请求写入消息到收到响应结果的过程中与延时生产操作相关的细节, 在了解相关的概念之后应该比较容易理解: 如果客户端设置的 acks 参数不为 -1 ,或者没有成功的消息写入,那么就直接返回结果给客户端,否则就需要创建延时生产操作并存入延时操作管理器,最终要么由外部事件触发,要么由超时触发而执行。