上一篇【第47篇】Kafka延迟操作(DelayedOperation)源码解析——优雅处理等待响应
下一篇【第49篇】Kafka副本机制源码解析(一)——副本的"世界观"
摘要
上篇我们讲了DelayedOperation,但没有深究它背后的定时器是怎么工作的。Kafka没有用JDK自带的ScheduledThreadPool或DelayQueue,而是自己实现了一套层级时间轮(Hierarchical TimingWheel)——因为Broker上可能同时挂载数万个延迟任务,传统的堆/队列定时器在这种量级下会严重拖垮性能。
本文从"为什么不用JDK定时器"讲起,手绘时间轮的完整数据结构,逐行拆解add()、advanceClock()、reinsert()三大核心操作的源码,最后解析SystemTimer如何把时间轮封装成一个优雅的定时器服务。读完你会发现:原来定时器也可以这么写!
一、为什么不用 JDK 自带的定时器?
Kafka Broker 需要同时管理海量定时任务。一个中型集群的Broker上可能有上万个未完成的生产请求(每个都是一个DelayedProduce)和上万个等待数据的拉取请求(DelayedFetch),每个都需要超时管理。
我们先看看常见定时器的性能特点:
【定时器实现方案对比】 方案A: java.util.Timer(最小堆) ┌──────────────────────┐ │ 插入: O(log N) │ ← 每次插入都要调整堆 │ 删除: O(log N) │ ← 取消了要删除 │ 推进: O(1) │ ← 直接拿堆顶 │ │ │ 问题: 单线程,线程 │ │ 安全差,不适合 │ │ 高并发场景 │ └──────────────────────┘ 方案B: DelayQueue(PriorityBlockingQueue) ┌──────────────────────┐ │ 插入: O(log N) │ ← 加锁 + 堆调整 │ 删除: O(log N) │ ← 加锁 + 堆调整 │ 推进: O(1) │ ← poll() 取堆顶 │ │ │ 问题: 上万任务时, │ │ O(log N) 也扛 │ │ 不住 │ └──────────────────────┘ 方案C: Kafka TimingWheel(时间轮) ┌──────────────────────┐ │ 插入: O(1) ★ │ ← 直接定位槽位 │ 删除: O(1) ★ │ ← 从链表移除 │ 推进: O(1) ★ │ ← 移动指针即可 │ │ │ 代价: 空间换时间, │ │ 需要预分配槽位 │ └──────────────────────┘| 对比维度 | Timer/DelayQueue(堆) | Kafka TimingWheel |
|---|---|---|
| 插入复杂度 | O(log N) | O(1) |
| 删除复杂度 | O(log N) | O(1) |
| 推进复杂度 | O(1)(取堆顶) | O(1)(移动指针) |
| 10万任务时的插入 | 约 log₂(100000) ≈ 17次比较 | 1次哈希定位 |
| 内存开销 | 较小 | 较大(预分配槽数组) |
| 适用场景 | 任务数少(<1000) | 海量任务(万级以上) |
一句话总结:JDK定时器的O(log N)在小规模下完全够用,但在Kafka这种数万任务同频次增删的场景下,日志次方的开销累积起来就是性能瓶颈。时间轮用空间换时间,把增删都做到了O(1)。
二、单层时间轮:用"时钟"理解核心思想
时间轮的思想其实非常简单——想象一个钟表:
【单层时间轮结构(8个槽位,每个槽代表1ms)】 时间轮总跨度 = 8ms(tickMs=1ms, wheelSize=8) ┌─────────────────────────────────┐ │ TimingWheel │ │ │ │ 槽位0 槽位1 槽位2 │ │ ┌────┐ ┌────┐ ┌────┐ │ │ │ │ │ T2 │ │ │ │ │ └────┘ └────┘ └────┘ │ │ │ │ 槽位7 槽位6 ... 槽位3 │ │ ┌────┐ ┌────┐ ┌────┐ │ │ │ │ │ T1 │ │ │ │ │ └────┘ └────┘ └────┘ │ │ ↑ │ │ 当前时间=6ms │ │ 指针指向槽位6 │ └─────────────────────────────────┘ 任务T1:延迟2ms,放入 (6+2) % 8 = 槽位0 任务T2:延迟4ms,放入 (6+4) % 8 = 槽位2 当指针走到槽位0时,T1到期 → 执行 当指针走到槽位2时,T2到期 → 执行核心公式:
槽位编号 = (当前时间 + 延迟毫秒) / tickMs % wheelSize = (currentTime + delayMs) / tickMs % wheelSize但单层时间轮有个致命问题——时间跨度有限。如果wheelSize=20, tickMs=1ms,最多只能覆盖20ms的范围。那我要一个30ms后执行的任务怎么办?
答案:层级时间轮。
三、层级时间轮:像"水表"一样层层递进
Kafka的做法是把时间轮分层,就像水表上的千位轮走一圈,百位轮才走一格:
【层级时间轮结构——类比水表】 ┌──────────────────────┐ │ 第3层(百位轮) │ tickMs=100ms, wheelSize=20 │ 覆盖: 100~2000ms │ 走一格 = 100ms └──────────┬───────────┘ │ 走一圈(2000ms) → 向第4层进位 ┌──────────▼───────────┐ │ 第2层(十位轮) │ tickMs=10ms, wheelSize=20 │ 覆盖: 10~200ms │ 走一格 = 10ms └──────────┬───────────┘ │ 走一圈(200ms) → 向第3层进位 ┌──────────▼───────────┐ │ 第1层(个位轮) │ tickMs=1ms, wheelSize=20 │ 覆盖: 1~20ms │ 走一格 = 1ms └──────────────────────┘ ↓ 这个轮叫 overflowWheel(溢出轮) 层级递进规则: - 任务延迟 ≤ 当前层最大跨度 → 放入当前层 - 任务延迟 > 当前层最大跨度 → 放入上层(overflowWheel) - 上层指针走一格 → 该格的定时任务"降级"到下一层 示例: 30ms延迟的任务 → 第1层最多20ms,放不下 → 第2层最多200ms,放得下 → 放入第2层 → 第2层指针走到 (30/10)%20 = 槽位3 → 指针走到槽位3时,任务降级到第1层四、TimingWheel 源码解析 —— add/advanceClock/reinsert
现在看源码。Kafka的时间轮实现在TimingWheel.scala:
/** * Kafka 层级时间轮实现 * 位置:core/src/main/scala/kafka/utils/timer/TimingWheel.scala */@nonthreadsafeprivate[timer]classTimingWheel(tickMs:Long,// 每个槽位代表的毫秒数(基本时间粒度)wheelSize:Int,// 槽位数量startMs:Long,// 时间轮的起始时间戳taskCounter:AtomicInteger,// 全局任务计数器queue:DelayQueue[TimerTaskList]// 用于推进时间的延迟队列){// 当前时间轮的总跨度 = tickMs * wheelSizeprivate[this]val interval=tickMs*wheelSize// 槽位数组:每个槽位是一个 TimerTaskList(双向链表)private[this]val buckets=Array.tabulate[TimerTaskList](wheelSize){_=>newTimerTaskList(taskCounter)}// 当前时间(已推进到的时间点)private[this]varcurrentTime=startMs-(startMs%tickMs)// 上层时间轮(当任务延迟超过本层跨度时使用)@volatileprivate[this]varoverflowWheel:TimingWheel=null// ============ 核心方法 1: add() ============/** * 添加一个定时任务到时间轮 * * @return false 表示任务已过期,应立即执行 * true 表示成功添加到时间轮 */defadd(timerTaskEntry:TimerTaskEntry):Boolean={val expiration=timerTaskEntry.expirationMs// 任务的过期时间戳if(timerTaskEntry.cancelled()){// 任务已被取消,直接忽略false}elseif(expiration<currentTime+tickMs){// ★ 任务已经过期(或即将在下一个tick过期)// 返回 false 让调用者立即执行false}elseif(expiration<currentTime+interval){// ★ 任务在本层时间轮的覆盖范围内// 计算应该放入哪个槽位val virtualId=(expiration/tickMs).toInt val bucket=buckets(virtualId%wheelSize.toLong)bucket.add(timerTaskEntry)// 如果这个槽位刚被激活(之前是空的),设置过期时间if(bucket.setExpiration(virtualId*tickMs)){// 放入 DelayQueue 来驱动时间推进queue.offer(bucket)}true}else{// ★ 任务延迟超过本层范围 → 交给上层时间轮if(overflowWheel==null){// 懒创建上层时间轮addOverflowWheel()}overflowWheel.add(timerTaskEntry)}}// ============ 核心方法 2: advanceClock() ============/** * 推进时间轮的当前时间。 * 这个方法由 SystemTimer 的驱动线程调用。 * * @param timeMs 要推进到的时间点 */defadvanceClock(timeMs:Long):Unit={if(timeMs>=currentTime+tickMs){// 将当前时间向下对齐到 tickMs 的倍数currentTime=timeMs-(timeMs%tickMs)// ★ 同时推进上层时间轮(递归)if(overflowWheel!=null){overflowWheel.advanceClock(currentTime)}}}// ============ 辅助方法: addOverflowWheel() ============private[this]defaddOverflowWheel():Unit={// 上层时间轮的 tickMs = 当前层的 tickMs * wheelSize// 即:上层时间轮每个槽代表的时间粒度更粗overflowWheel=newTimingWheel(tickMs=interval,// ★ 关键:上层tick = 本层intervalwheelSize=wheelSize,startMs=currentTime,taskCounter=taskCounter,queue=queue// ★ 共享同一个 DelayQueue!)}}add() 方法的决策树:
【add(task) 决策流程】 add(task) 被调用 │ ├─ 任务已取消? │ └─ YES → 返回 false(忽略) │ ├─ expiration < currentTime + tickMs? │ └─ YES → 返回 false(已过期,立即执行) │ ├─ expiration < currentTime + interval? │ └─ YES → 放入本层槽位 buckets[idx] │ → 如果槽位刚激活,加入 queue(推进用) │ → 返回 true │ └─ NO → 延迟超出本层范围 → 懒创建 overflowWheel → overflowWheel.add(task) ← 递归!五、TimerTaskList —— 槽位的"双向链表"实现
每个槽位不是简单存一个任务,而是一个带过期时间的双向链表:
/** * 时间轮槽位 —— 存储同一时间点到期的一组任务 * 位置:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala */classTimerTaskList(taskCounter:AtomicInteger)extendsDelayed{// 哨兵节点(简化边界处理)privateval root=newTimerTaskEntry(null,-1)root.next=root root.prev=root// 这个槽位的过期时间privateval expiration=newAtomicLong(-1L)// 设置过期时间(只有在槽位为空时才能设置)defsetExpiration(expirationMs:Long):Boolean={expiration.getAndSet(expirationMs)!=expirationMs}// 添加任务到链表尾部defadd(timerTaskEntry:TimerTaskEntry):Unit={vardone=falsewhile(!done){// 先移除任务原有的链表关系(可能之前在别的槽位)timerTaskEntry.remove()synchronized{timerTaskEntry.list=this// 标记属于这个列表val tail=root.prev timerTaskEntry.next=root timerTaskEntry.prev=tail tail.next=timerTaskEntry root.prev=timerTaskEntry taskCounter.incrementAndGet()done=true}}}// 执行该槽位的所有到期任务defflush(f:TimerTaskEntry=>Unit):Unit={synchronized{varhead=root.nextwhile(head ne root){remove(head)// 从链表移除if(!head.cancelled){f(head)// 执行回调(最终会调用 task.run())}head=root.next}expiration.set(-1L)// 重置过期时间}}// DelayQueue 接口:获取过期时间override defgetDelay(unit:TimeUnit):Long={unit.convert(expiration.get()-System.currentTimeMillis(),MILLISECONDS)}}关键设计点:
- TimerTaskList 实现了
Delayed接口—— 这样它就能放入DelayQueue。系统不需要轮询每个槽位,而是用DelayQueue.take()阻塞等待最近一个到期的槽位。 - 哨兵节点(root)—— 避免空链表时的空指针判断。
flush()方法—— 当时间指针走到这个槽位时,一次性执行该槽位的所有任务。
六、SystemTimer —— 把时间轮封装成完整的定时器服务
SystemTimer是时间轮的"操盘手",它负责驱动时间推进:
/** * 系统定时器 —— 时间轮的上层封装 * 位置:core/src/main/scala/kafka/utils/timer/SystemTimer.scala */classSystemTimer(executorName:String,tickMs:Long=1,// 每个槽 1mswheelSize:Int=20,// 每层 20 个槽startMs:Long=System.currentTimeMillis())extendsTimerwithLogging{// 底层时间轮(初始只有一层)private[this]val timingWheel=newTimingWheel(tickMs=tickMs,wheelSize=wheelSize,startMs=startMs,taskCounter=taskCounter,queue=delayQueue)// DelayQueue:用于阻塞等待下一个到期的槽位private[this]val delayQueue=newDelayQueue[TimerTaskList]()// 守护线程:负责推进时间private[this]val readWriteLock=newReentrantReadWriteLock()private[this]val writeLock=readWriteLock.writeLock()// ============ 核心:驱动线程 ============/** * 添加任务时,如果这会改变"最近到期时间",可能需要唤醒驱动线程。 * 但 Kafka 的实现更巧妙: * * 驱动线程用 delayQueue.take() 阻塞等待最近到期的槽位 * → 当有新的更早到期的任务加入时 → 新槽位被 offer 进 queue * → 不需要显式唤醒(take() 本身就是阻塞等待的) */// ============ 添加任务 ============defadd(timerTask:TimerTask):Unit={readLock.lock()try{// 创建 TimerTaskEntry 包装任务val taskEntry=newTimerTaskEntry(timerTask,timerTask.delayMs+System.currentTimeMillis())// 委托给时间轮处理 add →// → 过期了?返回 false,直接运行// → 没过期?放入对应层级、对应槽位if(!timingWheel.add(taskEntry)){// 已过期的任务,直接在线程池中执行if(!taskEntry.cancelled){taskExecutor.submit(timerTask)}}}finally{readLock.unlock()}}// ============ 推进时间(由 AdvanceThread 调用) ============defadvanceClock(timeoutMs:Long):Boolean={// 从 DelayQueue 获取最近到期的槽位(阻塞等待最多 timeoutMs 毫秒)varbucket=delayQueue.poll(timeoutMs,TimeUnit.MILLISECONDS)if(bucket!=null){writeLock.lock()try{while(bucket!=null){// 推进时间轮到该槽位的过期时间timingWheel.advanceClock(bucket.getExpiration())// 执行该槽位的所有到期任务// flush 内部会遍历链表,对每个任务调用 task.run()bucket.flush(reinsert)// 尝试获取下一个到期的槽位(不阻塞)bucket=delayQueue.poll()}}finally{writeLock.unlock()}true}else{false// 没有到期的任务}}// ============ 任务降级:reinsert ============/** * 当一个任务在上层时间轮到期时,它被"降级"到下层。 * 但直接用 add() 就行 —— add() 会自动判断该放哪一层。 */private[this]val reinsert:TimerTaskEntry=>Unit={timerTaskEntry=>addTimerTaskEntry(timerTaskEntry)}}七、完整运行流程:一个30ms延迟任务的生命周期
通过一个完整例子串联所有组件:
【30ms延迟任务的完整生命周期】 假设:tickMs=1ms, wheelSize=20, startMs=0 第1层跨度 = 20ms, 第2层跨度 = 400ms 时间线: ───────────────────────────────────────────────────────────► T=0ms: add(task, delay=30ms) │ │ SystemTimer.add() │ └─ timingWheel.add(taskEntry) │ │ │ ├─ expiration=30 < currentTime+tickMs(1)? NO │ ├─ expiration=30 < currentTime+interval(20)? NO │ └─ 创建 overflowWheel (tickMs=20ms, interval=400ms) │ └─ overflowWheel.add(taskEntry) │ │ 当前第2层 currentTime=0 │ ├─ expiration=30 < 0+20? NO │ ├─ expiration=30 < 0+400? YES → 放入第2层 │ │ slot = (30/20) % 20 = 槽位1 │ │ bucket[1].setExpiration(20ms) │ │ delayQueue.offer(bucket[1]) │ └─ true ✓ │ ▼ T=20ms: delayQueue.poll() 返回 bucket[1] │ │ SystemTimer.advanceClock() │ │ │ ├─ timingWheel.advanceClock(20) │ │ currentTime = 20ms │ │ overflowWheel.advanceClock(20) → 递归推进上层 │ │ │ └─ bucket[1].flush(reinsert) │ │ │ ├─ 遍历链表中的 taskEntry │ ├─ 对每个 taskEntry 调用 reinsert │ │ └─ timingWheel.add(taskEntry) ← 再次添加! │ │ │ expiration=30 < 20+1? NO │ │ │ expiration=30 < 20+20? YES → 放入第1层 │ │ │ slot = (30/1) % 20 = 槽位10 │ │ │ bucket[10].setExpiration(30ms) │ │ │ delayQueue.offer(bucket[10]) │ │ └─ true ✓ │ │ │ └─ 清空 bucket[1] │ ▼ T=30ms: delayQueue.poll() 返回 bucket[10] │ │ SystemTimer.advanceClock() │ │ │ ├─ timingWheel.advanceClock(30) │ │ currentTime = 30ms │ │ │ └─ bucket[10].flush(reinsert) │ │ │ ├─ 遍历链表中的 taskEntry │ ├─ 对每个 taskEntry 调用 reinsert │ │ └─ timingWheel.add(taskEntry) │ │ expiration=30 < 30+1? YES → 返回 false! │ │ ──► SystemTimer.add() 中: │ │ taskExecutor.submit(timerTask) ★ │ │ └─ timerTask.run() → forceComplete() │ │ │ └─ 清空 bucket[10] │ ▼ TASK EXECUTED ✓八、时间轮 vs 其他定时器方案的终极对比
| 维度 | Timer (堆) | DelayQueue (堆) | ScheduledThreadPool | Kafka TimingWheel |
|---|---|---|---|---|
| 插入 | O(log N) | O(log N) | O(log N) | O(1) |
| 删除 | O(log N) | O(log N) | O(log N) | O(1) |
| 推进 | O(1) | O(1) | O(1) | O(1) |
| 线程模型 | 单线程 | 无自带线程 | 线程池 | 单推进线程 + 任务线程池 |
| 海量任务 | ❌ 性能退化严重 | ❌ 同左 | ❌ 同左 | ✅ 专为此设计 |
| 精度 | 毫秒级 | 毫秒级 | 毫秒级 | 毫秒级(由tickMs决定) |
| 内存开销 | 小 | 小 | 小 | 较大(预分配槽数组) |
| 适用场景 | <1000任务 | <5000任务 | <10000任务 | 万级以上任务 |
Kafka时间轮的"必杀技":
- O(1) 时间复杂度的增删—— 不需要调整堆,直接哈希定位槽位
- DelayQueue 驱动推进—— 不需要轮询,阻塞等待最近到期的槽位,零 CPU 浪费
- 层级设计—— 长延迟任务自动"上浮"到粗粒度层,到期时再"沉降"回来
- 惰性创建—— overflowWheel 只在需要时才创建,节省内存
本篇小结
Kafka的时间轮是一个教科书级别的"空间换时间"设计:
- 单层时间轮用哈希定位实现O(1)的增删,但时间跨度受限
- 层级时间轮通过"上层粗粒度、下层细粒度"的分层设计,既保持O(1)性能,又能覆盖任意长度的时间范围
- SystemTimer用DelayQueue + 推进线程优雅地驱动整个时间轮,无需轮询
- reinsert机制实现了任务在不同层级之间的"升降级":延迟长时放在粗粒度层,快到期时降到细粒度层
这套设计从Kafka 0.8版本就开始使用,经受住了全球数万集群的考验。理解了它,你再看Netty的HashedWheelTimer、Dubbo的时间轮,会发现思路其实是相通的。
上一篇【第47篇】Kafka延迟操作(DelayedOperation)源码解析——优雅处理等待响应
下一篇【第49篇】Kafka副本机制源码解析(一)——副本的"世界观"