private[this] val interval = tickMs * wheelSize private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => newTimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
// overflowWheel can potentially be updated and read by two concurrent threads through add(). // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM @volatileprivate[this] var overflowWheel: TimingWheel = null
defadd(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) { // Cancelled false } elseif (expiration < currentTime + tickMs) { // Already expired false } elseif (expiration < currentTime + interval) { // Put in its own bucket val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)
// Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } }
// Try to advance the clock defadvanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { currentTime = timeMs - (timeMs % tickMs)
// Try to advance the clock of the overflow wheel if present if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } } }
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
privatedefaddTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) } }
// Trigger a purge if the number of completed but still being watched operations is larger than // the purge threshold. That number is computed by the difference btw the estimated total number of // operations and the number of pending delayed operations. if (estimatedTotalOperations.get - delayed > purgeInterval) { // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") val purged = allWatchers.map(_.purgeCompleted()).sum debug("Purged %d elements from watch lists.".format(purged)) } }