阅读完需:约 64 分钟
海量定时任务管理的问题
一个大型内容审核平时,在运营设定审核了内容的通过的时间,到了这个时间之后,相关内容自动审核 通过, 本是个小的需求,但是考虑到如果需要定时审核的东西很多,这样大量的定时任务带来的一系列问题, 海量定时任务管理的场景非常多,在实际项目中,存在大量需要定时或是延时触发的任务, 比如电商中,延时需要检查订单是否支付成功,是否配送成功,定时给用户推送提醒等等
方案一 单定时器方案
描述: 把所有需要定时审核的资源放到redis中,例如sorted set中,需要审核通过的时间作为score值。 后台启动一个定时器,定时轮询sortedSet,当score值小于当前时间,则运行任务审核通过。
问题 这个方案在小批量数据的情况下没有问题, 但是在大批量任务的情况下就会出现问题了,因为每次都要轮询全量的数据,逐个判断是否需要执行, 一旦轮询任务执行比较长,就会出现任务无法按照定时的时间执行的问题。
方案二 多定时器方案
描述 每个需要定时完成的任务都启动一个定时任务,然后等待完成之后销毁
问题 这个方案带来的问题很明显,定时任务比较多的情况下,会启动很多的线程,这样服务器会承受不了之 后崩溃。 基本上不会采取这个方案
方案三 redis的过期通知功能
描述 和方案一类似,针对每一个需要定时审核的任务,设定过期时间, 过期时间也就是审核通过的时间,订阅redis的过期事件,当这个事件发生时,执行相应的审核通过任 务。
问题 这个方案来说是借用了redis这种中间件来实现我们的功能,这中实际上属于redis的发布订阅功能中的 一部分, 针对redis发布订阅功能是不推荐我们在生产环境中做业务操作的, 通常redis内部(例如redis集群节点上下线,选举等等来使用),我们业务系统使用它的这个事件会产 生如下两个问题
- redis发布订阅的不稳定问题
- redid发布订阅的可靠性问题
https://my.oschina.net/u/2457218/blog/3065021 (redis的发布订阅缺陷)
方案四 Hash分层记时轮(分层时间轮)算法
这个东西就是专为大批量定时任务管理而生。
http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf
什么是时间轮
时间轮这个技术其实出来很久了,在kafka、zookeeper等技术中都有时间轮使用的方式。
时间轮的基本概念 时间轮这个技术其实出来很久了,在kafka、zookeeper、Netty、Dubbo等高性能组件中都有时间轮使 用的方式。 如图,时间轮,从图片上来看,就和手表的表圈是一样,所以称为时间轮。
时间轮其实就是一种环形的数据结构,其设计参考了时钟转动的思维, 可以想象成时钟,分成很多格子,一个格子代表一段时间 时间轮是由多个时间格组成,下图中有8个时间格,每个时间格代表当前时间轮的基本时间跨度 (tickDuration),其中时间轮的时间格的个数是固定的。
图中,有8个时间格(槽),假设每个时间格的单位为100ms,那么整个时间轮走完一圈需要800ms。每100ms指针会沿着顺时针方向移动一个时间单位,这个单位可以代表时间精度这个单位可以设置,比如以秒为单位,也可以以一小时为单位。
通过指针移动,来获得每个时间格中的任务列表,然后遍历这一个时间格中的双向链表来执行任务,以此循环。
时间轮是以时间作为刻度, 组成的一个环形队列,这个环形队列采用数组来实现,数组的每个元素称为槽 Bucket
每个槽位可以放一个定时任务列表,叫HashedWheelBucket
每个槽位可以是一个双向链表,其中可以设置一个 sentinel 哨兵节点, 作为添加任务和删除任务的起始节点。
槽位链表的每一项表示一个定时任务项(HashedWhellTimeout),其中封装了真正的定时任务TimerTask。
简单来说:
时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。
把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。
时间轮的模型能够高效管理各种任务:
- 延时任务、
- 周期任务、
- 通知任务。
时间轮算法在很多框架中都有用到,比如 Dubbo、Netty、Kafka 等。
时间轮算法也是一个比较经典的设计。
时间轮与基于排序链表的定时器的不同
在时间轮内,指针指向轮子上的一个槽。 它以恒定的速率顺时针转动。没转动一步就指向下一个槽,每次转动称之为一个tick。 一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。
时间轮共有N个槽,因此它运转一周的时间是N*si。 每个槽指向一个定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差N*si的整数倍。 时间轮正式利用这个关系将定时器散列到不同的链表中。
假如:现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(timer slot)对应的链表中: ts = (cs + (ti / si)) % N
基于排序链表的定时器,使用唯一的链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低。
而时间轮使用哈希表的思想,将定时器散列到不同的链表上。 这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。
很显然,对时间轮而言:
- 要提高定时精度,就要使si值足够小,时间间隔越短, 精度越高;
- 要提高执行效率,则要求N值足够大, 槽位越多,每个槽位的任务越少。
上图描述的是一个简单的时间轮,仅仅一个轮子。 而复杂的时间轮可能有多个轮子,不同轮子拥有不同的粒度。
时间轮的特点
时间轮是一个高性能,低消耗的数据结构,它适合用非准实时,延迟的短平快任务,例如心跳检测。 比如Netty动辄管理100w+的连接,每一个连接都会有很多超时任务。 比如发送超时、心跳检测间隔等,如果每一个定时任务都启动一个Timer,不仅低效,而且会消耗大量的资源。 在Netty中的一个典型应用场景是判断某个连接是否idle,如果idle(如客户端由于网络原因导致到服 务器的心跳无法送达),则服务器会主动断开连接,释放资源。 得益于Netty NIO的优异性能,基于Netty开发的服务器可以维持大量的长连接,单台8核16G的云主机 可以同时维持几十万长连接,及时掐掉不活跃的连接就显得尤其重要。
时间轮的使用场景
时间轮的本质是一种类似延迟任务队列的实现, 那么它的特点如上所述,适用于对时效性不高的,可快速执行的,大量这样的“小”任务,能够做到高性能,低消耗。
应用场景大致有: 心跳检测(客户端探活) 会话、请求是否超时 消息延迟推送 业务场景超时取消(订单、退款单等)
时间轮的思想应用范围非常广泛,各种操作系统的定时任务调度,Crontab,还有基于java的通信框架 Netty中也有时间轮的实现, 几乎所有的时间任务调度系统采用的都是时间轮的思想。 至于采用round型的基础时间轮还是采用分层时间轮,看实际需要吧,时间复杂度和实现复杂度的取舍
时间轮的使用
这里使用的时间轮是Netty这个包中提供的,使用方法比较简单。 先构建一个HashedWheelTimer时间轮。
-
tickDuration
: 100 ,表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是 指针100ms跳动一次,每次跳动一个窗格 -
ticksPerWheel
:1024,表示时间轮上一共有多少个窗格,分配的窗格越多,占用内存空间就越大 -
leakDetection
:是否开启内存泄漏检测。 -
maxPendingTimeouts
[可选参数],最大允许等待的任务数,默认没有限制。 - 通过
newTimeout
() 把需要延迟执行的任务添加到时间轮中
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 16);
final long start = System.currentTimeMillis();
Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("task execute,current timestamp=" + (System.currentTimeMillis() - start));
countDownLatch.countDown();
}
}, 2000, TimeUnit.MILLISECONDS);
//Thread.sleep(1000);
//timeout.cancel();
countDownLatch.await();
System.out.println("============================" + (System.currentTimeMillis() - start));
timer.stop();
}
时间轮的原理解析
时间轮的整体原理,分为几个部分。
时间轮至少需要提供4个功能:
- 加入任务
- 执行任务
- 删除任务
- 沿着时间刻度前进
创建时间轮
时间轮本质上是一个环状数组,比如我们初始化时间轮时:ticksPerWheel=8,那么意味着这个环状数 组的长度是8
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
添加任务
- 当通过newTimeout()方法添加一个延迟任务时,该任务首先会加入到一个阻塞队列中。
- 然后会有一个定时任务从该队列获取任务,添加到时间轮的指定位置.
任务执行
Worker线程按照每次间隔时间转动后,得到该时间窗格中的任务链表,然后从链表的head开始逐个取 出任务,有两个判断条件
- 当前任务需要转动的圈数为0,表示任务是当前圈开始执行
- 当前任务达到了delay时间,也就是
timeout.deadline <= deadline
- 最终调用
timeout.expire()
方法执行任务。
同一时刻存在多个任务
更通用的情况, 同一时刻可能需要执行多个任务, 比如: 每天上午九点除了生成报表之外, 执行发送邮件的任务, 执行创建文件的任务, 执行数据分析的任务
为了存储这些任务,时间轮的数据结构其实类似hashmap,只是key时间刻度值,并且不做hash运算 每一个时间刻度,可以理解为一个槽位, 如果有多个任务需要执行呢?一个槽位可以指向一个数组或者链表,用来存放该刻度需要执行的任务
同一时刻存在多个任务时,只要把该刻度对应的链表全部遍历一遍,执行(扔到线程池中异步执行)其 中的任务即可。
时间刻度不够用怎么办
如果任务不只限定在一天之内呢? 比如我有个任务,需要每周一上午九点执行,我还有另一个任务,需要每周三的上午九点执行。 大概的解决办法是:
- 增大时间轮的刻度
- 列表中的任务中添加round属性
- 分层时间轮
增大时间轮的刻度
一天24个小时,一周168个小时,为了解决上面的问题,我可以把时间轮的刻度(槽)从12个增加到 168个,
比如现在是星期二上午10点钟,那么下周一上午九点就是时间轮的第9个刻度,这周三上午九点就是时间轮的第57个刻度
仔细思考一下,会发现这中方式存在几个缺陷:
- 时间刻度太多会导致时间轮走到的多数刻度没有任务执行,比如一个月就2个任务,我得移动720 次,其中718次是无用功。
- 时间刻度太多会导致存储空间变大,利用率变低,比如一个月就2个任务,我得需要大小是720的 数组,如果我的执行时间的粒度精确到秒,那就更恐怖了。
列表中的任务中添加round属性
这次我不增加时间轮的刻度了,刻度还是24个,现在有三个任务需要执行
- 任务一每周二上午九点。
- 任务二每周四上午九点。
- 任务三每个月12号上午九点。
比如现在是9月11号星期二上午10点,时间轮转一圈是24小时,到任务一下次执行(下周二上午 九点), 需要时间轮转过6圈后,到第7圈的第9个刻度开始执行。 任务二下次执行第3圈的第9个刻度,任务三是第2圈的第9个刻度。
时间轮每移动到一个刻度时,遍历任务列表,把round值-1,然后取出所有round=0的任务执行。
这样做能解决时间轮刻度范围过大造成的空间浪费,但是却带来了另一个问题:
- 时间轮每次都需要遍历任务列表,耗时增加,当时间轮刻度粒度很小(秒级甚至毫秒级)
- 任务列表又特别长时,这种遍历的办法是不可接受的。
当然,对于大多数场景,这种方法还是适用的
有没有既节省空间,又节省时间的办法呢? 有的!
有一种分层时间轮, 可以解决做到既节省空间,又节省时间
分层时间轮
分层时间轮是这样一种思想:
- 针对时间复杂度的问题:
- 不做遍历计算round,凡是任务列表中的都应该是应该被执行的,直接全部取出来执行。
- 针对空间复杂度的问题:
- 分层,每个时间粒度对应一个时间轮,多个时间轮之间进行级联协作。
第一点很好理解,第二点有必要举个例子来说明。 比如我有三个任务:
- 任务一每周二上午九点。
- 任务二每周四上午九点。
- 任务三每个月12号上午九点
拿任务三来说,任务三得到执行的前提是, 时间刻度先得来到12号这一天,然后才需要关注其更细一级的时间单位:上午9点。 基于这个思想,我们可以设置三个时间轮:月轮、周轮、天轮。
- 月轮的时间刻度是天。
- 周轮的时间刻度是天。
- 天轮的时间刻度是小时。
初始添加任务时: 任务一添加到周轮上, 任务二添加到周轮上 任务三添加到月轮上。 三个时间轮以各自的时间刻度不停流转。
当周轮移动到刻度2(星期二)时,取出这个刻度下的任务1,丢到天轮上,天轮接管该任务,到9点执行。 当周轮移动到刻度4(周四)时,取出这个刻度下的任务2,丢到天轮上,天轮接管该任务,到9点执行。 当月轮移动到刻度12(12号)时,取出这个刻度下的任务,丢到天轮上,天轮接管该任务,到9点执行。 这样就可以做到既不浪费空间,有不浪费时间。
HashedWheelTimer的构造
这里我们可以先看下HashedWheelTimer的UML图,能够对相关组件先有个整体的认识
-
Timer: 定时器接口,提供
提交延时任务newTimeout、停止定时器
等方法 -
HashedWheelTimer: 实现Timer接口,内部包含
工作线程Worker、时间轮wheel、延时任务队列timeouts、线程池taskExecutor等
-
HashedWheelBucket:上面的时间轮wheel是一个HashedWheelBucket数组,
每一个刻度对应一个HashedWheelBucket,而每一个HashedWheelBucket内部是一个HashedWheelTimeout的双向链表
,如下图
- TimerTask: 延时任务接口,内部只提供一个run方法用于执行
- Timeout: 对Timer、TimerTask的封装
-
HashedWheelTimeout: 包含了
任务的执行时间dealline、所需要的圈数remainingRounds、双向链表中上一个以及下一个HashedWheelTimeout、所在的HashedWheelBucket等
大致工作流程如下图:
从上图可以看到,主要分为4步骤,但是准确来说应该是有5步:
- 提交延时任务给
HashedWheelTimer
,延时任务会先放到任务队列timeouts
中 - 工作线程Worker会从任务队列timeouts中获取任务
- 将获取到的
HashedWheelTimeout
任务放到指定的HashedWheelBucket
中 - 取出当前刻度对应的
HashedWheelBucket
的所有HashedWheelTimeout
来执行 - 将刻度tick加1,再回到第二步,如此循环
源码解析
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.15.Final</version>
</dependency>
时间轮接口
package netty.NiEn.TimeWheel.util;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 时间轮接口
*/
public interface Timer {
/**
* newTimeout()添加定时任务
* 如果没有启动时间轮,则启动
*
* @param task 定时任务
* @param delay 延迟时间
* @param unit 延迟时间单位
* @return the timeout
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* 停止时间轮
*
* @return the set
*/
Set<Timeout> stop();
/**
* 动态开关开启
* 1、轮子每 tick ,将格子内所有定时任务执行
* 2、开关开启后的定时任务直接执行,不进入格子。
*/
void openSwitch();
/**
* 动态开关关闭
* 1、轮子每 tick ,只执行过期的定时任务
* 2、新的 newTimeout 添加的定时任务,添加到格子
*/
void closeSwitch();
}
定时任务
package netty.NiEn.TimeWheel.util;
import java.util.concurrent.TimeUnit;
/**
* 定时任务
*/
public interface TimerTask {
/**
* 延时执行定时任务 {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout
*/
void run(Timeout timeout) throws Exception;
}
Timer与TimerTask的封装
package netty.NiEn.TimeWheel.util;
/**
* {@link Timer} 处理 {@link TimerTask} 完成, 返回值
*/
public interface Timeout {
/**
* 获取时间轮引用
*
* @return the timer
*/
Timer timer();
/**
* 获取需要执行的任务
*
* @return the timer task
*/
TimerTask task();
/**
* 定时任务是否过期
*
* @return the boolean
*/
boolean isExpired();
/**
* 定时任务是否取消
*
* @return the boolean
*/
boolean isCancelled();
/**
* 试图取消 {@link TimerTask}
* 如果任务已经执行或取消,它将直接返回
*
* @return 如果取消成功,则返回true
*/
boolean cancel();
}
时间轮
package netty.NiEn.TimeWheel.util;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetectorFactory;
import io.netty.util.ResourceLeakTracker;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import static io.netty.util.internal.StringUtil.simpleClassName;
/**
* 时间轮
*/
public class HashedWheelTimer implements Timer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);
/*****************************************************************/
//这里使用的Queue不是普通java自带的Queue的实现,而是使用JCTool–一个高性能的的并发Queue实现包。
//--动态配置提前执行
private volatile boolean dynamicOpen;
//存放纳秒--worker 初始化 startTime
private volatile long startTime;
//存放纳秒--tick的时长,即指针多久转一格
private final long tickDuration;
//轮子基本结构
private final HashedWheelBucket[] wheel;
//用来快速计算任务应该放的格子--位运算
private final int mask;
//用来阻塞 start()的线程
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
//待执行任务队列
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
//待取消任务队列
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
//等待处理计数器
private final AtomicLong pendingTimeouts = new AtomicLong(0);
//最大等待处理次数
private final long maxPendingTimeouts;
/*****************************************************************/
//AtomicIntegerFieldUpdater是JUC里面的类,原理是利用反射进行原子操作。有比AtomicInteger更好的性能和更低得内存占用
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
//声明 worker
private final Worker worker = new Worker();
//worker 线程
private final Thread workerThread;
//定义worker的3个状态:初始化、启动、关闭
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
//当前时间轮的状态
public volatile int workerState = WORKER_STATE_INIT;
/*****************************************************************/
//实例计数器
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
//警告太多的实例
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
//实例数限制
private static final int INSTANCE_COUNT_LIMIT = 64;
/*****************************************************************/
//内存泄露检测器
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);
private final ResourceLeakTracker<HashedWheelTimer> leak;
/*****************************************************************/
/**
* 时间轮的构造函数
*
* @param threadFactory {@link ThreadFactory}用来创建执行{@link TimerTask}的 worker 线程 {@link Thread}
* @param tickDuration tick的时长,即指针多久转一格
* @param unit {@code tickDuration}的时间单位
* @param ticksPerWheel 每圈几格
* @param leakDetection 是否开启内存泄露检测;
* 默认设置{@code true}
* 如果工作线程不是守护线程,需设置 false
* @param maxPendingTimeouts 最大等待处理次数;
* 调用{@code newTimeout}的次数超过最大等待处理次数,将抛出异常{@link RejectedExecutionException}
* 如果该值为0或负值,则不考虑最大超时时间限制。
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
//参数校验
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
//创建时间轮基本的数据结构,一个数组,长度为不小于 ticksPerWheel 的最小2的 n 次方
wheel = createWheel(ticksPerWheel);
//这是一个表示符,用来快速计算任务应该放的格子
// 了解,给定一个deadline的定时任务,其应该呆的格子=deadline%wheel.length.但是%操作是个相对耗时的操作,
// 所以使用一种变通的位运算代替:
// 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline&mast == deadline%wheel.length
// java中的HashMap也是使用这种处理方法
mask = wheel.length - 1;
//转换成纳秒处理
this.tickDuration = unit.toNanos(tickDuration);
//校验是否内存溢出。即指针转动的时间间隔不能太长而导致 tickDuration*wheel.length>Long.MAX_VALUE
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d)", tickDuration, Long.MAX_VALUE / wheel.length));
}
//创建 worker 线程
workerThread = threadFactory.newThread(worker);
//这里默认是启动内存泄露检测:当HashedWheelTimer实例超过当前cpu可用核数 *4的时候,将发出警告
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
//设置最大等待处理次数
this.maxPendingTimeouts = maxPendingTimeouts;
//log 警告太多的实例
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
/**
* 创建时间轮 - 使用默认线程工厂,100ms 转一格, 512格 ,启用内存溢出检测,不设置最大等待处理次数
*/
public HashedWheelTimer() {
this(Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 512);
}
/**
* 创建时间轮 - 使用默认线程工厂,512格 ,启用内存溢出检测,不设置最大等待处理次数
*
* @param tickDuration tick的时长,即指针多久转一格
* @param unit {@code tickDuration}的时间单位
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit, 512);
}
/**
* 创建时间轮 - 使用默认线程工厂,启用内存溢出检测,不设置最大等待处理次数
*
* @param tickDuration tick的时长,即指针多久转一格
* @param unit {@code tickDuration}的时间单位
* @param ticksPerWheel 每圈几格
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
/**
* 创建时间轮 - 启用内存溢出检测,不设置最大等待处理次数
*
* @param threadFactory {@link ThreadFactory}用来创建执行{@link TimerTask}的 worker 线程 {@link Thread}
* @param tickDuration tick的时长,即指针多久转一格
* @param unit {@code tickDuration}的时间单位
* @param ticksPerWheel 每圈几格
* 默认设置{@code true}
* 如果工作线程不是守护线程,需设置 false
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
/**
* 创建时间轮 -不设置 最大等待处理次数
*
* @param threadFactory {@link ThreadFactory}用来创建执行{@link TimerTask}的 worker 线程 {@link Thread}
* @param tickDuration tick的时长,即指针多久转一格
* @param unit {@code tickDuration}的时间单位
* @param ticksPerWheel 每圈几格
* @param leakDetection 是否开启内存泄露检测;
* 默认设置{@code true}
* 如果工作线程不是守护线程,需设置 false
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 参数校验
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
//等待处理计数器加一
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//设置最大等待处理次数 并 等待处理计数器 大于 最大等待处理次数
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果时间轮没有启动,则启动
start();
//计算任务的deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
//这里定时任务不是直接加到对应的格子中,
//而是先加入到一个待执行任务队列里,然后等到下一个tick的时候,会从队列里取出最多10w个任务加入到指定的格子中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
// 启动时间轮。
// 这个方法其实不需要显示的主动调用,因为在添加定时任务(newTimeout()方法)的时候会自动调用此方法。
// 这个是合理的设计,因为如果时间轮里根本没有定时任务,启动时间轮也是空耗资源
public void start() {
// 判断当前时间轮的状态
// 如果是初始化,则启动worker线程,启动整个时间轮;
// 如果已经启动,则略过;
// 如果已经停止,则报错
// 这里是一个Lock Free的设计。因为可能有多个线程调用启动方法,这里使用AtomicIntegerFieldUpdater原子的更新时间轮的状态
switch(WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待worker线程初始化时间轮的启动时间
while(startTime == 0) {
try {
startTimeInitialized.await();
} catch(InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
//停止时间轮的方法
@Override
public Set<Timeout> stop() {
// worker线程不能停止时间轮,也就是加入的定时任务,不能调用这个方法。
// 不然会有恶意的定时任务调用这个方法而造成大量定时任务失效
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName());
}
// 尝试CAS替换当前状态为--"停止:2"。
// 如果失败,则当前时间轮的状态只能是--"初始化:0"或者"停止:2"。直接将当前状态设置为--"停止:2"
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
//实例计数器减一
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return Collections.emptySet();
}
// 终端 worker 线程
// interrupt()只是改变中断状态而已:
// interrupt()不会中断一个正在运行的线程。
// 这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。
// 更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,
// 那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态.
// 如果线程没有被阻塞,这时调用interrupt()将不起作用;仅仅是设置中断标志位为true
try {
boolean interrupted = false;
while(workerThread.isAlive()) {
//改变中断状态
workerThread.interrupt();
try {
//触发中断异常
workerThread.join(100);
} catch(InterruptedException ignored) {
interrupted = true;
}
}
//当前线程中断
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
//实例计数器加一
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
}
@Override
public void openSwitch() {
dynamicOpen = true;
logger.info("HashedWheelTimer openSwitch");
}
@Override
public void closeSwitch() {
dynamicOpen = false;
logger.info("HashedWheelTimer closeSwitch");
}
//初始化时间轮
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
//参数校验
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
// 初始化 ticksPerWheel 的值为 不小于 ticksPerWheel 的最小2的n次方
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
//初始化 wheel 数组
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for(int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
// 初始化 ticksPerWheel 的值为 不小于 ticksPerWheel 的最小2的n次方
//这里其实不建议使用这种方式,因为当ticksPerWheel的值很大的时候,这个方法会循环很多次,方法执行时间不稳定,效率也不够。
private static int normalizeTicksPerWheelOld(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while(normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
//推荐使用java8 HashMap的做法:
private static int normalizeTicksPerWheel(int ticksPerWheel) {
//这里参考java8 hashmap的算法,使推算的过程固定
int n = ticksPerWheel - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 这里1073741824 = 2^30,防止溢出
return (n < 0) ? 1 : (n >= 1073741824) ? 1073741824 : n + 1;
}
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
// 该对象进行 GC 完成时,进行判断
// 如果我们还没有关闭,然后我们要确保减少活动实例计数。
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
}
}
//警告太多的实例
private static void reportTooManyInstances() {
String resourceType = simpleClassName(HashedWheelTimer.class);
logger.error("You are creating too many " + resourceType + " instances. " + resourceType + " is a shared resource that must be reused across the JVM," + "so that only a few instances are created.");
}
/*=====================================================*/
/* Worker */
/*=====================================================*/
//worker 是时间轮的核心线程类。 tick 的转动,过期任务的处理都是在这个线程中处理的
private final class Worker implements Runnable {
//未处理任务列表
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
//初始化 startTime. 只有所有任务的 deadline 都是相对于这个时间点
startTime = System.nanoTime();
//由于System.nanoTime()可能返回0,甚至负数。
// 并且0是一个标识符,用来判断startTime是否被初始化,所以当startTime=0的时候,重新赋值为1
if (startTime == 0) {
startTime = 1;
}
//唤醒阻塞在start()的线程
startTimeInitialized.countDown();
//只要时间轮的状态为 WORKER_STATE_STARTED就循环的 "转动" tick,循环判断响应格子中的到期任务
do {
final long deadline = waitForNextTick();
// 可能溢出或者被中断的时候会返回负数, 所以小于等于0不管
if (deadline > 0) {
//获取 tick 对应的格子索引
int idx = (int) (tick & mask);
//移除被取消的任务
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
//从任务队列中取出任务加入到对应的格子中
transferTimeoutsToBuckets();
//过期执行格子中的任务---或是开启开关的
bucket.expireTimeouts(deadline, dynamicOpen);
tick++;
}
} while(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 这里应该是时间轮停止了,清除所有格子中的任务,并加入到未处理任务列表,以供stop()方法返回
for(HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将还没有加入到格子中的待处理定时任务队列中的任务取出,
// 如果是未取消的任务,则加入到未处理任务队列中,以供stop()方法返回
for(; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//处理取消的任务
processCancelledTasks();
}
//返回不可修改的 未处理任务列表 视图
public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
//将 newTimeout() 方法中 待处理定时任务队列中的任务加入到指定的格子中
private void transferTimeoutsToBuckets() {
//每次 tick 只处理 10w个任务,以免阻塞 worker 线程
for(int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
//如果没有任务了,直接跳出循环
if (timeout == null) {
break;
}
//还没有放入到格子中就取消了,直接略过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
//如果动态开关打开,直接调用执行任务,不进入格子
if (dynamicOpen) {
// logger.info("-----未进格子node-----:" + timeout);
timeout.expire();
continue;
}
//计算任务需要经过多少个 tick
long calculated = timeout.deadline / tickDuration;
//计算任务的轮数
timeout.remainingRounds = (calculated - tick) / wheel.length;
//如果任务在 timeouts 队列里面放久了,以至于已经过了执行时间,
//这个时候就使用当前tick, 也就是放到当前 bucket, 此方法调用完后就会被执行.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
//将任务加入到相应的格子中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
// logger.info("=====进入格子node=====:" + timeout);
}
}
//将取消的任务取出,并从格子中移除
private void processCancelledTasks() {
for(; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
break;
}
try {
//从格子中移除自身
timeout.remove();
} catch(Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
//sleep,直到下次 tick 到来,然后返回该次 tick 和启动时间之间的时长
private long waitForNextTick() {
//下次 tick 的时间点,用于计算需要 sleep 的时间
long deadline = tickDuration * (tick + 1);
// System.out.println("tick=" + tick + ", deadline=" + deadline);
for(; ; ) {
//计算需要 sleep 的时间,之所以加 999999 后再除以 1000000,是为了保证足够的 sleep 时间
//例如:当 deadline - currentTime = 2000002 的时候,如果不加 999999,则只睡了 2ms
//而 2ms 其实是未达到 deadline 这个时间点的,所以为了使上述情况能 sleep 足够的时间,加上999999后,会多睡1ms
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//毫秒
// System.out.println("deadline=" + deadline + ",currentTime=" + currentTime + ",sleepTimeMs=" + sleepTimeMs);
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// 这里是因为windows平台的定时调度最小单位为10ms,如果不是10ms的倍数,可能会引起sleep时间不准确
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch(InterruptedException ignored) {
//调用HashedWheelTimer.stop()时优雅退出
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
}// worker end
/*=====================================================*/
/* HashedWheelBucket */
/*=====================================================*/
/**
* HashedWheelBucket 用来存放 HashedWheelTimeout,结构类似于 LinkedList。
* 提供了 expireTimeouts(long deadline) 方法来过期并执行格子中的定时任务
*/
private static final class HashedWheelBucket {
//LinkedList结构
//指向格子中任务的首尾
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
//基础链表添加操作
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
//过期并执行格子中的到期任务,tick 到该格子的时候,worker 线程会调用这个方法,根据 deadline 和 remainingRounds 判断任务是否过期
public void expireTimeouts(long deadline, boolean dynamicOpen) {
HashedWheelTimeout timeout = head;
//遍历格子中的所有定时任务
while(timeout != null) {
//先保存 next ,因为移除后 next 将被设置为 null
HashedWheelTimeout next = timeout.next;
//开启动态开关--直接执行任务
if (dynamicOpen) {
next = remove(timeout);
timeout.expire();
} else if (timeout.remainingRounds <= 0) { //定时任务到期
next = remove(timeout);
//过期并执行任务
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
//如果 round 数已经为0,deadline 却大于当前格子的 deadline,说放错格子了,这种情况应该不会出现
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {//定时任务被取消
next = remove(timeout);
} else {
timeout.remainingRounds--;
}
timeout = next;
}
}
//基础链表 移除 node 操作
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
//删除已处理或取消的任务,更新链表
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
//即使头也是尾
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
//如果是尾部,尾部向前更新一个节点
tail = timeout.prev;
}
//prev, next and bucket 附空并允许 GC
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
//计数器减一
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
//清除格子并返回所有未过期、未取消的任务
public void clearTimeouts(Set<Timeout> set) {
for(; ; ) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
//未过期、未取消
set.add(timeout);
}
}
//链表的 poll 操作
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
//prev, next and bucket 附空并允许 GC
head.prev = null;
head.next = null;
head.bucket = null;
return head;
}
} //HashedWheelBucket end
/*=====================================================*/
/* HashedWheelTimeout */
/*=====================================================*/
/**
* HashedWheelTimeout是一个定时任务的内部包装类,双向链表结构。会保存 定时任务到期执行的任务、deadline、round等信息
*/
private static final class HashedWheelTimeout implements Timeout {
//定义定时任务的3个状态:初始化、取消、过期
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
//用 CAS (比较并交换-乐观锁) 方式更新定时任务状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
//时间轮引用
private final HashedWheelTimer timer;
//具体到期需要执行的任务
private final TimerTask task;
//最后期限
private final long deadline;
private volatile int state = ST_INIT;
//离任务执行的轮数
//当将此任务加入到格子中时,计算该值 。 每过一轮,该值减一
long remainingRounds;
//使用双向链表结构,由于只有 worker 线程访问,这里不需要 synchronization/volatile
HashedWheelTimeout next;
HashedWheelTimeout prev;
//定时任务所在的格子
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public Timer timer() {
return timer;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
//这里只是修改状态为 ST_CANCELLED,会在下次 tick 时,在格子中移除
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
//加入到时间轮的待取消队列,并在每次 tick 的时候,从相应格子中移除
//因此,这意味着我们将有最大的 GC 延迟。1 tick 时间足够好。这样我们可以再次使用我们的 MpscLinkedQueue 队列,尽可能减少锁定/开销。
timer.cancelledTimeouts.add(this);
return true;
}
/**
* 比较并设置状态
*
* @param expected 预期值--原值
* @param state 更新的值
* @return
*/
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}
@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}
//从格子中移除自身
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
//计数器减一
timer.pendingTimeouts.decrementAndGet();
}
}
//过期并执行任务
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch(Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
StringBuilder buf = new StringBuilder(192)
.append(simpleClassName(this))
.append('(');
buf.append("remainingRounds:" + remainingRounds);
buf.append(", deadline: ");
if (remaining > 0) {
buf.append(remaining)
.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining)
.append(" ns ago");
} else {
buf.append("now");
}
if (isCancelled()) {
buf.append(", cancelled");
}
return buf.append(", task: ")
.append(simpleClassName(task()))
.append(')')
.toString();
}
} //HashedWheelTimeout end
}
测试时间轮
package netty.NiEn.TimeWheel.util;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
public class HashedWheelTimerTest {
public static void main(String[] args) throws InterruptedException {
testWorker();
}
public static void testWorker() throws InterruptedException {
final HashedWheelTimer timerProcessed = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
final AtomicInteger counter = new AtomicInteger();
System.out.println("start:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(new Date()));
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("task1:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(new Date()) + "==" + timeout);
}
}, 10, TimeUnit.MINUTES);
Thread.sleep(1000);
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("task2:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(new Date()) + "==" + timeout);
}
}, 20, TimeUnit.MINUTES);
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("task3:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(new Date()) + "==" + timeout);
}
}, 30, TimeUnit.MINUTES);
timerProcessed.openSwitch();
Thread.sleep(3000);
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("task4:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(new Date()) + "==" + timeout);
}
}, 4, TimeUnit.SECONDS);
Thread.sleep(3000);
timerProcessed.closeSwitch();
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("task5:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(new Date()) + "==" + timeout);
}
}, 5, TimeUnit.SECONDS);
Thread.sleep(10000);
}
public void testScheduleTimeoutShouldNotRunBeforeDelay() throws InterruptedException {
final Timer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
final CountDownLatch barrier = new CountDownLatch(1);
final Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
fail("This should not have run");
barrier.countDown();
}
}, 10, TimeUnit.SECONDS);
assertFalse(barrier.await(3, TimeUnit.SECONDS));
assertFalse("timer should not expire", timeout.isExpired());
timer.stop();
}
public void testScheduleTimeoutShouldRunAfterDelay() throws InterruptedException {
final Timer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
;
final CountDownLatch barrier = new CountDownLatch(1);
final Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
barrier.countDown();
}
}, 2, TimeUnit.SECONDS);
assertTrue(barrier.await(3, TimeUnit.SECONDS));
assertTrue("timer should expire", timeout.isExpired());
timer.stop();
}
public void testStopTimer() throws InterruptedException {
final Timer timerProcessed = new HashedWheelTimer();
;
for(int i = 0; i < 3; i++) {
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
}
}, 1, TimeUnit.MILLISECONDS);
}
Thread.sleep(1000L); // sleep for a second
assertEquals("Number of unprocessed timeouts should be 0", 0, timerProcessed.stop().size());
final Timer timerUnprocessed =new HashedWheelTimer();
;
for(int i = 0; i < 5; i++) {
timerUnprocessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
}
}, 5, TimeUnit.SECONDS);
}
Thread.sleep(1000L); // sleep for a second
assertFalse("Number of unprocessed timeouts should be greater than 0", timerUnprocessed.stop().isEmpty());
}
public void testTimerShouldThrowExceptionAfterShutdownForNewTimeouts() throws InterruptedException {
final Timer timer = new HashedWheelTimer();
;
for(int i = 0; i < 3; i++) {
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
}
}, 1, TimeUnit.MILLISECONDS);
}
timer.stop();
Thread.sleep(1000L); // sleep for a second
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
fail("This should not run");
}
}, 1, TimeUnit.SECONDS);
}
public void testTimerOverflowWheelLength() throws InterruptedException {
final HashedWheelTimer timer = new HashedWheelTimer(
Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 32);
final AtomicInteger counter = new AtomicInteger();
timer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
counter.incrementAndGet();
timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
}, 1, TimeUnit.SECONDS);
Thread.sleep(3500);
assertEquals(3, counter.get());
timer.stop();
}
public void testExecutionOnTime() throws InterruptedException {
int tickDuration = 200;
int timeout = 125;
int maxTimeout = 2 * (tickDuration + timeout);
final HashedWheelTimer timer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS);
final BlockingQueue<Long> queue = new LinkedBlockingQueue();
int scheduledTasks = 100000;
for(int i = 0; i < scheduledTasks; i++) {
final long start = System.nanoTime();
timer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
queue.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}, timeout, TimeUnit.MILLISECONDS);
}
for(int i = 0; i < scheduledTasks; i++) {
long delay = queue.take();
assertTrue("Timeout + " + scheduledTasks + " delay " + delay + " must be " + timeout + " < " + maxTimeout,
delay >= timeout && delay < maxTimeout);
System.out.println("Timeout + " + scheduledTasks + " delay " + delay + " must be " + timeout + " < " + maxTimeout);
}
timer.stop();
}
}
HashedWheelTimer会自动的分层吗?
HashedWheelTimer 不会自动进行分层。
它所采用的分层时间轮结构,是通过构造方法传入不同的 tickDuration 参数来实现的。
当我们在构造 HashedWheelTimer 时,传入不同的 tickDuration 参数,例如:
- tickDuration = 50 ms,它会产生秒级时间轮,以实现高精度秒级定时。
- tickDuration = 1 s, 它会产生分钟级时间轮,以实现分钟级定时。
- tickDuration = 1 min,它会产生小时级时间轮,以实现小时级定时。
但是,一旦 HashedWheelTimer 构造完成,其时间轮的时效就固定了。此后添加的所有定时任务都会放入此时间轮中处理。
HashedWheelTimer 不会自动根据定时任务的时间范围来动态变更时间轮的时效或进行分层。
举例来说:
- 我们构造 HashedWheelTimer(50ms),获得一个秒级时间轮。
- 先添加了几个 10 秒定时任务,秒级时间轮可以很好地处理。
- 后续又添加了几个 1 小时定时任务,这些任务也会被放入秒级时间轮中。
- 尽管 1 小时时间范围更适合分配到分钟级时间轮,但是 HashedWheelTimer 不会自动进行这样的分层调整。
所以,HashedWheelTimer 实现分层时间轮的效果,主要依赖于我们在构造它时传入的 tickDuration 参数。
一旦构造完成,其时间轮的时效是固定的,不会根据后续添加的定时任务的时间范围来自动变更或进行分层。
这也意味着,我们在使用 HashedWheelTimer 时,需要考虑可能会添加的定时任务的时间范围,选择一个较为恰当的 tickDuration 来构造时间轮。
如果 time range 跨度较大,甚至需要多个层次,那么需要分别构造秒级、分钟级和小时级的时间轮来配合使用。
所以,总结来说:
HashedWheelTimer 不会自动进行分层或动态调整时间轮的时效。
它实现分层时间轮的效果主要依赖于构造时传入的 tickDuration 参数。
一旦构造完成,其时间轮的时效是固定的,不会根据后续任务自动调整。
我们需要在构造 HashedWheelTimer 时考虑可能的定时任务 time range,选择一个适当的 tickDuration。
如果需要多层级时间轮,需要分别构造秒级、分钟级和小时级的 HashedWheelTimer 实例来使用。
kafka的时间轮会自动分层?
是的,Kafka 的时间轮是自动分层的。Kafka 的时间轮由多个时间轮级别组成,每个级别的时间轮都包含多个时间槽,每个时间槽代表一个时间窗口。Kafka 的时间轮会根据时间轮级别的不同,将时间槽分成不同的层次。
例如,Kafka 中的秒级时间轮通常会分成 60 个时间槽,每个时间槽代表一秒钟。如果一个消息的时间戳落在了某个时间槽内,那么这个消息就会被放到这个时间槽对应的队列中。当时间轮转动到下一个时间槽时,就会将上一个时间槽中的消息取出并发送给消费者。当秒级时间轮的时间槽满了之后,Kafka 会自动将这些时间槽合并成一个更大的时间槽,然后放到分钟级时间轮中。分钟级时间轮也会在时间槽满了之后自动合并成更大的时间槽,然后放到小时级时间轮中,以此类推。
这种自动分层的机制可以让 Kafka 的时间轮更加高效地处理大量的消息,并保证消息的及时性和准确性。
单层时间轮
下图是一个单层时间轮,假设下图时间轮的周期是1秒,时间轮中有10个槽位,则每个槽位代表的时间就是100ms,现在有A、B、C三个任务,分别是任务A(230ms后执行)、B(450ms之后运行)、C(1950ms之后运行)。我们可以看到任务A被放到了槽位2,任务B被放到了槽位4,任务C被放到了槽位9,当时间轮转动到对应的槽时,就会从槽中取出任务判断是否需要执行。这个里面涉及一个周期概念,任务C具有一个周期,当时间轮完成一次循环,下次执行到9的时候,任务C才会执行,目前Dubbo中采用单层时间轮机制。
多层时间轮
对应多层时间轮就是具有多个时间轮,下图中具有两个时间轮,第一层时间轮还是保持和单层时间轮一样,第二层时间轮为一个周期为10秒的时间轮,还是按照上述案例,这个时候A、B任务还是被分配在第一层时间轮,对于C任务,当完成完成一个周期以后,第二层时间轮刻度会执行到1的位置,同时任务C也会被取出到第一层时间轮9的位置,当一层时间轮再次转动到9的位置的时候,则会触发任务C,这种将第二层的任务取出放入第一层中称为降层,它是为了保证任务被处理的时间精度。Kafka内部就是采用的这种多层时间轮机制。
时间轮应用场景
- 心跳检查,Netty中的心跳检查就是采用时间轮形式;
- 超时处理,目前Dubbo中采用时间轮来处理超时调用;
- 分布式锁续期,目前在分布式锁Redisson通过时间轮定时给分布式锁续期;
- 定时任务,对于分布式定时任务的调度就是采用的时间轮设计;
- 消息中间件,延时队列消息的中间件一般采用时间轮实现;
Kafka时间轮
时间轮的数据结构
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。在Kafka源码中对这个TimeTaskList是用一个名称为buckets的数组表示的,所以后面介绍中可能TimerTaskList也会被称为bucket。
针对上图的几个名词简单解释下:
- tickMs:时间轮由多个时间格组成,每个时间格就是tickMs,它代表当前时间轮的基本时间跨度。
- wheelSize:代表每一层时间轮的格数
- interval:当前时间轮的总体时间跨度,interval=tickMs × wheelSize
- startMs:构造当层时间轮时候的当前时间,第一层的时间轮的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),上层时间轮的startMs为下层时间轮的currentTime。
- currentTime:表示时间轮当前所处的时间,currentTime是tickMs的整数倍(通过currentTime=startMs – (startMs % tickMs来保正currentTime一定是tickMs的整数倍),这个运算类比钟表中分钟里65秒分钟指针指向的还是1分钟)。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList的所有任务。
时间轮中的任务存放
若时间轮的tickMs=1ms,wheelSize=20,那么可以计算得出interval为20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插入进来会存放到时间格为2的TimerTaskList中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2所对应的TimeTaskList中的任务做相应的到期操作。此时若又有一个定时为8ms的任务插入进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10。如果同时有一个定时为19ms的任务插入进来怎么办?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入到原本已经到期的时间格1中。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime+interval之间。
时间轮的升降级
如果此时有个定时为350ms的任务该如何处理?直接扩充wheelSize的大小么?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。Kafka为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
复用之前的案例,第一层的时间轮tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即为20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。
刚才提到的350ms的任务,不会插入到第一层时间轮,会插入到interval=20*20的第二层时间轮中,具体插入到时间轮的哪个bucket呢?先用350/tickMs(20)=virtualId(17),然后virtualId(17) %wheelSize (20) = 17,所以350会放在第17个bucket。如果此时有一个450ms后执行的任务,那么会放在第三层时间轮中,按照刚才的计算公式,会放在第0个bucket。第0个bucket里会包含[400,800)ms的任务。随着时间流逝,当时间过去了400ms,那么450ms后就要执行的任务还剩下50ms的时间才能执行,此时有一个时间轮降级的操作,将50ms任务重新提交到层级时间轮中,那么此时50ms的任务根据公式会放入第二个时间轮的第2个bucket中,此bucket的时间范围为[40,60)ms,然后再经过40ms,这个50ms的任务又会被监控到,此时距离任务执行还有10ms,同样将10ms的任务提交到层级时间轮,此时会加入到第一层时间轮的第10个bucket,所以再经过10ms后,此任务到期,最终执行。
整个时间轮的升级降级操作是不是很类似于我们的时钟? 第一层时间轮tickMs=1s, wheelSize=60,interval=1min,此为秒钟;第二层tickMs=1min,wheelSize=60,interval=1hour,此为分钟;第三层tickMs=1hour,wheelSize为12,interval为12hours,此为时钟。而钟表的指针就对应程序中的currentTime,这个后面分析代码时候会讲到(对这个的理解也是时间轮理解的重点和难点)。
任务添加和驱动时间轮滚动核心流程图
仿Kafka代码
SystemTimer
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@SuppressWarnings("AlibabaThreadPoolCreation")
public class SystemTimer implements Timer {
private String executorName;
private Long tickMs = 1L;
private Integer wheelSize = 20;
private Long startMs = System.currentTimeMillis();
private final TimingWheel timingWheel;
private final DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
private final AtomicInteger taskCounter = new AtomicInteger(0);
public SystemTimer(String executorName) {
this.executorName = executorName;
timingWheel = new TimingWheel(tickMs, wheelSize, startMs, taskCounter, delayQueue);
}
public SystemTimer(String executorName, Long tickMs, Integer wheelSize) {
this.executorName = executorName;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
timingWheel = new TimingWheel(tickMs, wheelSize, startMs, taskCounter, delayQueue);
}
public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {
this.executorName = executorName;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
timingWheel = new TimingWheel(tickMs, wheelSize, startMs, taskCounter, delayQueue);
}
/**
* 驱动线程池
*/
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(1, (runnable) -> new Thread(runnable, "executor-" + executorName));
/**
* 用于在勾选时保护数据结构的锁
*/
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
@Override
public void add(TimerTask timerTask) {
readLock.lock();
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis()));
} finally {
readLock.unlock();
}
}
/**
* 在 Systemtimer 中添加一个任务,任务被包装为一个TimerTaskEntry
*
* @param timerTaskEntry timerTaskEntry
*/
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
//先判断是否可以添加进时间轮中,如果不可以添加进去代表任务已经过期或者任务被取消,
// 注意这里的timingWheel持有上一层时间轮的引用,所以可能存在递归调用
if (!timingWheel.add(timerTaskEntry)) {
// 已经过期或取消
// 过期任务直接线程池异步执行掉
if (!timerTaskEntry.cancelled()) {
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
@Override
public Boolean advanceClock(Long timeoutMs) {
try {
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null) {
writeLock.lock();
try {
while (bucket != null) {
// 驱动时间轮
timingWheel.advanceClock(bucket.getExpiration());
//循环buckek也就是任务列表,任务列表一个个继续添加进时间轮以此来升级或者降级时间轮,把过期任务找出来执行
bucket.flush(this::addTimerTaskEntry);
//这里就是从延迟队列取出bucket,bucket是有延迟时间的,取出代表该bucket过期,我们通过bucket能取到bucket包含的任务列表
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} catch (InterruptedException e) {
e.printStackTrace();
return Boolean.FALSE;
}
}
@Override
public Integer size() {
return taskCounter.get();
}
@Override
public void shutdown() {
taskExecutor.shutdown();
}
}
Timer
public interface Timer {
/**
* 将新任务添加到此执行器。
* 它将在任务延迟后执行(从提交时开始)
*
* @param timerTask 需要添加的任务
*/
void add(TimerTask timerTask);
/**
* 驱动内部时钟,执行在经过的超时持续时间内到期的任何任务。
*
* @param timeoutMs 等待过期时间
* @return 是否执行了任何任务
*/
Boolean advanceClock(Long timeoutMs);
/**
* 获取挂起执行的任务数
*
* @return 任务数
*/
Integer size();
/**
* 关闭计时器服务,保留未执行的挂起任务
*/
void shutdown();
}
TimerTask
import lombok.Data;
@Data
public abstract class TimerTask implements Runnable {
/**
* timestamp in millisecond
*/
protected Long delayMs;
public TimerTask(Long delayMs) {
this.delayMs = delayMs;
}
private TimerTaskEntry timerTaskEntry;
public TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}
public void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
if (timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
}
public void cancel() {
synchronized (this) {
if (timerTaskEntry != null) {
timerTaskEntry.remove();
}
timerTaskEntry = null;
}
}
}
TimerTaskEntry
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TimerTaskEntry implements Comparable<TimerTaskEntry> {
protected TimerTask timerTask;
private Long expirationMs;
protected volatile TimerTaskList list;
protected TimerTaskEntry next;
protected TimerTaskEntry prev;
public TimerTaskEntry(TimerTask timerTask, Long expirationMs) {
this.timerTask = timerTask;
this.expirationMs = expirationMs;
//如果此timerTask已由现有的计时器任务条目保留,
// 则setTimerTaskEntry将删除它。
if (timerTask != null) {
timerTask.setTimerTaskEntry(this);
}
}
public Boolean cancelled() {
return timerTask.getTimerTaskEntry() != this;
}
public void remove() {
TimerTaskList currentList = this.list;
// 如果在另一个线程将条目从一个任务条目列表移动到另一个任务条目列表时调用remove,
// 则由于list的值发生更改,该操作可能无法删除该条目。因此,重试,直到列表变为空。
// 在极少数情况下,此线程会看到null并退出循环,但另一个线程稍后会将条目插入另一个列表。
while (currentList != null) {
currentList.remove(this);
currentList = list;
}
}
@Override
public int compareTo(TimerTaskEntry that) {
return Long.compare(expirationMs, that.expirationMs);
}
}
TimerTaskList
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class TimerTaskList implements Delayed {
private final TimerTaskEntry root = new TimerTaskEntry(null, -1L);
{
root.next = root;
root.prev = root;
}
private final AtomicInteger taskCounter;
private final AtomicLong expiration = new AtomicLong(-1L);
public TimerTaskList(AtomicInteger taskCounter) {
this.taskCounter = taskCounter;
}
public Boolean setExpiration(Long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
public Long getExpiration() {
return expiration.get();
}
/**
* Apply the supplied function to each of tasks in this list
*/
public void foreach(Consumer<TimerTask> f) {
synchronized (this) {
TimerTaskEntry entry = root.next;
while (entry != root) {
TimerTaskEntry nextEntry = entry.next;
if (!entry.cancelled()) {
f.accept(entry.timerTask);
}
entry = nextEntry;
}
}
}
/**
* Add a timer task entry to this list
*/
public void add(TimerTaskEntry timerTaskEntry) {
boolean done = Boolean.FALSE;
while (!done) {
// 如果计时器任务条目已经在任何其他列表中,删除它。
// 在下面的同步块之外执行此操作,以避免死锁。
// 重试,直到timerTaskEntry.list变为空。
timerTaskEntry.remove();
synchronized (this) {
if (timerTaskEntry.list == null) {
// 将计时器任务条目放在列表的末尾。(root.prev指向尾部)
TimerTaskEntry tail = root.prev;
timerTaskEntry.next = root;
timerTaskEntry.prev = tail;
timerTaskEntry.list = this;
tail.next = timerTaskEntry;
root.prev = timerTaskEntry;
taskCounter.incrementAndGet();
done = true;
}
}
}
}
/**
* Remove the specified timer task entry from this list
*/
public void remove(TimerTaskEntry timerTaskEntry) {
synchronized (this) {
if (timerTaskEntry.list.equals(this)) {
timerTaskEntry.next.prev = timerTaskEntry.prev;
timerTaskEntry.prev.next = timerTaskEntry.next;
timerTaskEntry.next = null;
timerTaskEntry.prev = null;
timerTaskEntry.list = null;
taskCounter.decrementAndGet();
}
}
}
/**
* Remove all task entries and apply the supplied function to each of them
*
* @param f function
*/
public void flush(Consumer<TimerTaskEntry> f) {
synchronized (this) {
TimerTaskEntry head = root.next;
while (head != root) {
remove(head);
f.accept(head);
head = root.next;
}
expiration.set(-1L);
}
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(Math.max(getExpiration() - System.currentTimeMillis(), 0), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
if (other instanceof TimerTaskList) {
return Long.compare(getExpiration(), ((TimerTaskList) other).getExpiration());
}
return 0;
}
}
TimingWheel
import lombok.Data;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class TimingWheel {
/**
* 时间轮由多个时间格组成,每个时间格就是 tickMs,它代表当前时间轮的基本时间跨度
*/
private Long tickMs;
/**
* 代表每一层时间轮的格数
*/
private Integer wheelSize;
/**
* 当前时间轮的总体时间跨度,interval=tickMs × wheelSize
*/
private Long interval;
/**
* 构造当层时间轮时候的当前时间
*/
private Long startMs;
/**
* 表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍
*/
private Long currentTime;
private TimerTaskList[] buckets;
private AtomicInteger taskCounter;
private DelayQueue<TimerTaskList> queue;
/**
* overflowWheel可能由两个并发线程通过add()进行更新和读取。
* 因此,由于JVM的双重检查锁定模式的问题,它需要是易变的
*/
private volatile TimingWheel overflowWheel;
public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.startMs = startMs;
this.currentTime = startMs - (startMs % tickMs);
this.taskCounter = taskCounter;
this.queue = queue;
this.buckets = new TimerTaskList[wheelSize];
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new TimerTaskList(taskCounter);
}
}
public void addOverflowWheel() {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(interval, wheelSize, currentTime, taskCounter, queue);
}
}
}
public Boolean add(TimerTaskEntry timerTaskEntry) {
Long expiration = timerTaskEntry.getExpirationMs();
if (timerTaskEntry.cancelled()) {
// 已取消
return Boolean.FALSE;
} else if (expiration < currentTime + tickMs) {
// 已过期
return Boolean.FALSE;
} else if (expiration < currentTime + interval) {
// 设置到当前 bucket
long virtualId = expiration / tickMs;
// 找到任务对应本时间轮的bucket
TimerTaskList bucket = buckets[(int) (virtualId % wheelSize.longValue())];
bucket.add(timerTaskEntry);
// 设置当前 bucket 过期时间
// 只有本bucket内的任务都过期后才会bucket.setExpiration返回true此时将bucket放入延迟队列
if (bucket.setExpiration(virtualId * tickMs)) {
// bucket是一个TimerTaskList,它实现了java.util.concurrent.Delayed接口,里面是一个多任务组成的链表
// bucket 需要排队,因为它是一个过期的 bucket
// 我们只需要在 bucket 的过期时间发生变化时排队,即轮子已经前进,以前的 bucket 得到重用;
// 在同一轮循环内设置过期的进一步调用将传入相同的值,因此返回 false,因此具有相同过期的 bucket 将不会多次排队。
return queue.offer(bucket);
}
return Boolean.TRUE;
} else {
// Out of the interval. Put it into the parent timer
//任务的过期时间不在本时间轮周期内说明需要升级时间轮,如果不存在则构造上一层时间轮,继续用上一层时间轮添加任务
if (overflowWheel == null) {
addOverflowWheel();
}
return overflowWheel.add(timerTaskEntry);
}
}
/**
* 尝试推进时间轮
*
* @param timeMs
*/
public void advanceClock(Long timeMs) {
if (timeMs >= currentTime + tickMs) {
// 把当前时间打平为时间轮tickMs的整数倍
currentTime = timeMs - (timeMs % tickMs);
// 如果有溢流轮,尝试提前溢流轮的时钟
// Try to advance the clock of the overflow wheel if present
//驱动上层时间轮,这里的传给上层的currentTime时间是本层时间轮打平过的,但是在上层时间轮还是会继续打平
if (overflowWheel != null) {
overflowWheel.advanceClock(currentTime);
}
}
}
}
TimerTaskTest
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.LocalDateTime;
public class TimerTaskTest {
private Timer timer = null;
@BeforeEach
public void setup() {
timer = new SystemTimer("test", 1L, 3);
}
@AfterEach
public void teardown() {
timer.shutdown();
}
@Test
public void testDelayTask() throws IOException {
System.out.println("提交任务:" + LocalDateTime.now());
timer.add(new TimerTask(1000L) {
@Override
public void run() {
System.out.println("执行任务1:" + LocalDateTime.now() + "," + Thread.currentThread().getName());
}
});
timer.add(new TimerTask(2000L) {
@Override
public void run() {
System.out.println("执行任务2:" + LocalDateTime.now() + "," + Thread.currentThread().getName());
}
});
timer.add(new TimerTask(100L) {
@Override
public void run() {
System.out.println("执行任务3:" + LocalDateTime.now() + "," + Thread.currentThread().getName());
}
});
for (; ; ) {
timer.advanceClock(200L);
}
}
}