Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka 基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)
Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能。JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O(nlogn)并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为0(1)。
Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列, 底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度( interval) 可以通过公式 tickMs × wheelSize
计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需
要处理此时间格所对应的 TimerTaskList 中的所有任务。
若时间轮的 tickMs 为 1ms 且 wheel Size 等于20 ,那么可以计算得出总体时间跨度 interval 为 20ms。初始情况下表盘指针 currentTime 指向时间格 0,此时有一个定时为 2ms 的任务插进来会存放到时间格为 2 的 TimerTaskList 中。随着时间的不断推移, 指针 currentTime 不断向前推进,过了 2ms 之后,当到达时间格 2 时,就需要将时间格 2 对应的TimeTaskList 中的任务进行相应的到期操作。此时若又有一个定时为 8ms 的任务插进来,则会存放到时间格 10 中,currentTime 再过 8ms 后会指向时间格10。
如果同时有一个定时为 19ms 的任务插进来怎么办?新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入原本己经到期的时间格 1。总之,整个时间轮的总体跨度是不变的,随着指针 currentTime 的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在 currentTime 和 currentTime+interval 之间。
为了满足大范围的延时任务,Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
复用之前的案例,第一层的时间轮 tickMs=1ms 、wheelSize=20 、inteval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval ,即 20ms。每一层时间轮的 wheelSize 是固定的都是 20 ,那么第二层的时间轮的总体时间跨度interval 为 400ms 。以此类推,这个 400ms 也是第三层的 tickMs 的大小, 第三层的时间轮的总体时间跨度为 8000ms 。
对于之前所说的 350ms 的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格 17 所对应的 TimerTaskList。如果此时又有一个定时为450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格 1 的 TimerTaskList 。注意到在到期时间为[400ms, 800ms) 区间内的多个任务(比如446ms 、455ms 和473ms 的定时任务)都会被放入第三层时间轮的时间格 1 ,时间格 1 对应的 TimerTaskList 的超时时间为 400ms 。随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。
这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历 40ms 之后,此时这个任务又被“察觉”,不过还剩余 1Oms ,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[ 1Oms, 11 ms) 的时间格中,之后再经历 1Oms 后,此任务真正到期,最终执行相应的到期操作。
在Kafka 中,第一层时间轮的参数同上面的案例一样:tickMs=1ms, whee1Size=20、interval=20ms,各个层级的 wheelSize 也固定为20,所以各个层级的 tickMs 和 interval 也可以相应地推算出来。
TimingWheel 中的每个双向环形链表 TimerTaskList 都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点,那么线性表的第一个元素应该是链表的第二个节点。
Kafka 中的定时器借助于 JDK 中的 DelayQueue 来推进时间轮。具体做法是对于每个使用到的 TimerTaskList 都加入 DelayQueue ,“每个用到的 TimerTaskList” 特指非哨兵节点的定时任务项 TimerTaskEntry 对应的 TimerTaskList。 DelayQueue 会根据 TimerTaskList 对应的超时时间 expiration 来排序, 最短 expiration 的 TimerTaskList 会被排在 DelayQueue 的队头。Kafka 中会有一个线程来获取 DelayQueue 中到期的任务列表,有意思的是这个线程所对应的名称叫作“ ExpiredOperationReaper ”,可以直译为“过期操作收割机”,当“收割机”线程获取 DelayQueue 中超时的任务列表 TimerTaskList 之后,既可以根据 TimerTaskList 的 expiration 来推进时间轮的时间,也可以就获取的 TimerTaskList 执行相应的操作,对里面的 TimerTaskEntry 该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。
Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry 的操作,而 DelayQueue 专门负责时间推进的任务。用 TimingWheel 做最擅长的任务添加和删除操作,而用 DelayQueue 做最擅长的时间推进工作,两者相辅相成。