0%

kafka之TimingWheel

背景介绍

Apache Kafka内部使用了TimingWheel来替换 java.util.concurrent.DelayQueue and java.util.Timer ,主要目的是为了将 O(log n)的插入和删除时间复杂度降低到 O(1)

任务的插入

总的来说,TimingWheel只有几十行代码,主要代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs

// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
@volatile private[this] var overflowWheel: TimingWheel = null

private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}

def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs

if (timerTaskEntry.cancelled) {
// Cancelled
false
} else if (expiration < currentTime + tickMs) {
// Already expired
false
} else if (expiration < currentTime + interval) {
// Put in its own bucket
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)

// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket)
}
true
} else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}

// Try to advance the clock
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)

// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}

添加任务对应到了上面的add方法,逻辑非常明白,如果已经取消了或者过期了就不处理,如果在第一层的表盘里,就直接加到对应bucket的链表中,如果超过了第一层的表盘,就递归的调用overflowWheel添加进去,这个类似数字计算进位一样,overflowWheel中会根据超过的倍数不断添加很多层。

任务的执行

执行的过程是通过 SystemTimer 来执行的,它的 advanceClock 方法会被一个线程循环调用,然后从 delayQueue 拉出一个bucket,这个 queue 和 TimngWheel 里的queue是同一个,拉出来之后会调用bucket.flush来把一个表盘中的某一格的任务链表都遍历一遍,先从链表删除,再调用 addTimerTaskEntry 方法加进去,如果加失败了,而且不是取消,就说明是过期了,就把任务放到 taskExecutor 线程池中去执行,这个线程池只有一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def advanceClock(timeoutMs: Long): Boolean = {
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration())
bucket.flush(reinsert)
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}

private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}

上面的执行入口是在 DelayedOperationPurgatory 的 expirationReaper 字段,它创建了一个线程专门去执行 advanceClock 方法,里面会调用SystemTimer.advanceClock 方法,也就是上面说到的那个方法,整个执行过程是单独的一个线程在执行,不影响添加任务的线程,但是这个线程里的处理逻辑比较复杂,所以这个O(1)换O(log n)也不是能白白换的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {

override def doWork() {
advanceClock(200L)
}
}

def advanceClock(timeoutMs: Long) {
timeoutTimer.advanceClock(timeoutMs)

// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get - delayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
debug("Begin purging watch lists")
val purged = allWatchers.map(_.purgeCompleted()).sum
debug("Purged %d elements from watch lists.".format(purged))
}
}

总结

  • 网上有很多介绍的,讲了如何将任务分到不同的表盘,但是没有讲怎么执行

Reference

  1. Kafka技术内幕样章 层级时间轮
如果您觉得这些内容对您有帮助,你可以赞助我以提高站点的文章质量