This commit is contained in:
Looly
2021-05-12 01:39:32 +08:00
parent 105e712a68
commit 24a36b53d5
4 changed files with 134 additions and 71 deletions

View File

@@ -6,6 +6,9 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* 系统计时器,
*/
public class SystemTimer { public class SystemTimer {
/** /**
* 底层时间轮 * 底层时间轮
@@ -25,10 +28,11 @@ public class SystemTimer {
/** /**
* 轮询delayQueue获取过期任务线程 * 轮询delayQueue获取过期任务线程
*/ */
private ExecutorService bossThreadPool; private final ExecutorService bossThreadPool;
/** /**
* 构造函数 * 构造函数
* @param timeout 超时时长
*/ */
public SystemTimer(int timeout) { public SystemTimer(int timeout) {
timeWheel = new TimingWheel(1, 20, System.currentTimeMillis(), delayQueue); timeWheel = new TimingWheel(1, 20, System.currentTimeMillis(), delayQueue);
@@ -59,7 +63,7 @@ public class SystemTimer {
TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (timerTaskList != null) { if (timerTaskList != null) {
//推进时间 //推进时间
timeWheel.advanceClock(timerTaskList.getExpiration()); timeWheel.advanceClock(timerTaskList.getExpire());
//执行过期任务(包含降级操作) //执行过期任务(包含降级操作)
timerTaskList.flush(this::addTask); timerTaskList.flush(this::addTask);
} }

View File

@@ -1,6 +1,12 @@
package cn.hutool.cron.timingwheel; package cn.hutool.cron.timingwheel;
/**
* 延迟任务
*
* @author eliasyaoyc, looly
*/
public class TimerTask { public class TimerTask {
/** /**
* 延迟时间 * 延迟时间
*/ */
@@ -27,22 +33,34 @@ public class TimerTask {
protected TimerTask prev; protected TimerTask prev;
/** /**
* 描述 * 任务描述
*/ */
public String desc; public String desc;
/**
* 构造
*
* @param task 任务
* @param delayMs 延迟毫秒数(以当前时间为准)
*/
public TimerTask(Runnable task, long delayMs) { public TimerTask(Runnable task, long delayMs) {
this.delayMs = System.currentTimeMillis() + delayMs; this.delayMs = System.currentTimeMillis() + delayMs;
this.task = task; this.task = task;
this.timerTaskList = null;
this.next = null;
this.prev = null;
} }
/**
* 获取任务
*
* @return 任务
*/
public Runnable getTask() { public Runnable getTask() {
return task; return task;
} }
/**
* 获取延迟时间点,即创建时间+延迟时长(单位毫秒)
* @return 延迟时间点
*/
public long getDelayMs() { public long getDelayMs() {
return delayMs; return delayMs;
} }

View File

@@ -6,41 +6,55 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
* @Author: siran.yao * 任务队列,任务双向链表
* @time: 2020/5/8:上午11:13 *
* @author siran.yaolooly
*/ */
public class TimerTaskList implements Delayed { public class TimerTaskList implements Delayed {
/** /**
* 过期时间 * 过期时间
*/ */
private AtomicLong expiration = new AtomicLong(-1L); private final AtomicLong expire;
/** /**
* 根节点 * 根节点
*/ */
private TimerTask root = new TimerTask( null,-1L); private final TimerTask root;
{ /**
* 构造
*/
public TimerTaskList(){
expire = new AtomicLong(-1L);
root = new TimerTask( null,-1L);
root.prev = root; root.prev = root;
root.next = root; root.next = root;
} }
/** /**
* 设置过期时间 * 设置过期时间
*
* @param expire 过期时间,单位毫秒
* @return 是否设置成功
*/ */
public boolean setExpiration(long expire) { public boolean setExpiration(long expire) {
return expiration.getAndSet(expire) != expire; return this.expire.getAndSet(expire) != expire;
} }
/** /**
* 获取过期时间 * 获取过期时间
* @return 过期时间
*/ */
public long getExpiration() { public long getExpire() {
return expiration.get(); return expire.get();
} }
/** /**
* 新增任务 * 新增任务,将任务加入到双向链表的头部
*
* @param timerTask 延迟任务
*/ */
public void addTask(TimerTask timerTask) { public void addTask(TimerTask timerTask) {
synchronized (this) { synchronized (this) {
@@ -57,6 +71,8 @@ public class TimerTaskList implements Delayed {
/** /**
* 移除任务 * 移除任务
*
* @param timerTask 任务
*/ */
public void removeTask(TimerTask timerTask) { public void removeTask(TimerTask timerTask) {
synchronized (this) { synchronized (this) {
@@ -71,27 +87,29 @@ public class TimerTaskList implements Delayed {
} }
/** /**
* 重新分配 * 重新分配,即将列表中的任务全部处理
*
* @param flush 任务处理函数
*/ */
public synchronized void flush(Consumer<TimerTask> flush) { public synchronized void flush(Consumer<TimerTask> flush) {
TimerTask timerTask = root.next; TimerTask timerTask = root.next;
while (!timerTask.equals(root)) { while (false == timerTask.equals(root)) {
this.removeTask(timerTask); this.removeTask(timerTask);
flush.accept(timerTask); flush.accept(timerTask);
timerTask = root.next; timerTask = root.next;
} }
expiration.set(-1L); expire.set(-1L);
} }
@Override @Override
public long getDelay(TimeUnit unit) { public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); return Math.max(0, unit.convert(expire.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
} }
@Override @Override
public int compareTo(Delayed o) { public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) { if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); return Long.compare(expire.get(), ((TimerTaskList) o).expire.get());
} }
return 0; return 0;
} }

View File

@@ -1,30 +1,40 @@
package cn.hutool.cron.timingwheel; package cn.hutool.cron.timingwheel;
import cn.hutool.log.StaticLog;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
/**
* 多层时间轮,常用于延时任务。<br>
* 时间轮是一种环形数据结构,由多个槽组成,每个槽中存放任务集合。<br>
* 一个单独的线程推进时间一槽一槽的移动,并执行槽中的任务。
*
* @author eliasyaoyc, looly
*/
public class TimingWheel { public class TimingWheel {
/** /**
* 一个时间槽的范围 * 一个时间槽的范围
*/ */
private long tickMs; private final long tickMs;
/** /**
* 时间轮大小 * 时间轮大小,时间轮中时间槽的个数
*/ */
private int wheelSize; private final int wheelSize;
/** /**
* 时间跨度 * 时间跨度,当前时间轮总间隔,即单个槽的跨度*槽个数
*/ */
private long interval; private final long interval;
/** /**
* 时间槽 * 时间槽
*/ */
private TimerTaskList[] timerTaskLists; private final TimerTaskList[] timerTaskLists;
/** /**
* 当前时间 * 当前时间,指向当前操作的时间格,代表当前时间
*/ */
private long currentTime; private long currentTime;
@@ -34,10 +44,18 @@ public class TimingWheel {
private volatile TimingWheel overflowWheel; private volatile TimingWheel overflowWheel;
/** /**
* 一个Timer只有一个delayQueue * 执行等待列表
*/ */
private DelayQueue<TimerTaskList> delayQueue; private final DelayQueue<TimerTaskList> delayQueue;
/**
* 构造
*
* @param tickMs 一个时间槽的范围,单位毫秒
* @param wheelSize 时间轮大小
* @param currentTime 当前时间
* @param delayQueue 执行等待链表
*/
public TimingWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) { public TimingWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
this.currentTime = currentTime; this.currentTime = currentTime;
this.tickMs = tickMs; this.tickMs = tickMs;
@@ -47,8 +65,53 @@ public class TimingWheel {
//currentTime为tickMs的整数倍 这里做取整操作 //currentTime为tickMs的整数倍 这里做取整操作
this.currentTime = currentTime - (currentTime % tickMs); this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue; this.delayQueue = delayQueue;
for (int i = 0; i < wheelSize; i++) { }
timerTaskLists[i] = new TimerTaskList();
/**
* 添加任务到时间轮
*
* @param timerTask 任务
*/
public boolean addTask(TimerTask timerTask) {
long expiration = timerTask.getDelayMs();
//过期任务直接执行
if (expiration < currentTime + tickMs) {
return false;
} else if (expiration < currentTime + interval) {
//当前时间轮可以容纳该任务 加入时间槽
long virtualId = expiration / tickMs;
int index = (int) (virtualId % wheelSize);
StaticLog.debug("tickMs: {} ------index: {} ------expiration: {}", tickMs, index, expiration);
TimerTaskList timerTaskList = timerTaskLists[index];
if(null == timerTaskList){
timerTaskList = new TimerTaskList();
timerTaskLists[index] = timerTaskList;
}
timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) {
//添加到delayQueue中
delayQueue.offer(timerTaskList);
}
} else {
//放到上一层的时间轮
TimingWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timerTask);
}
return true;
}
/**
* 推进时间
* @param timestamp 推进的时间
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
//推进上层时间轮时间
this.getOverflowWheel().advanceClock(timestamp);
}
} }
} }
@@ -65,44 +128,4 @@ public class TimingWheel {
} }
return overflowWheel; return overflowWheel;
} }
/**
* 添加任务到时间轮
*/
public boolean addTask(TimerTask timerTask) {
long expiration = timerTask.getDelayMs();
//过期任务直接执行
if (expiration < currentTime + tickMs) {
return false;
} else if (expiration < currentTime + interval) {
//当前时间轮可以容纳该任务 加入时间槽
Long virtualId = expiration / tickMs;
int index = (int) (virtualId % wheelSize);
System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
TimerTaskList timerTaskList = timerTaskLists[index];
timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) {
//添加到delayQueue中
delayQueue.offer(timerTaskList);
}
} else {
//放到上一层的时间轮
TimingWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timerTask);
}
return true;
}
/**
* 推进时间
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
//推进上层时间轮时间
this.getOverflowWheel().advanceClock(timestamp);
}
}
}
} }