fix method

This commit is contained in:
Looly
2021-05-12 02:10:02 +08:00
parent 24a36b53d5
commit 7f2f97353c
2 changed files with 78 additions and 25 deletions

View File

@@ -7,7 +7,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 系统计时器 * 系统计时器
*
* @author eliasyaoyc, looly
*/ */
public class SystemTimer { public class SystemTimer {
/** /**
@@ -21,28 +23,54 @@ public class SystemTimer {
private final DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>(); private final DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/** /**
* 过期任务执行线程 * 执行队列取元素超时时长单位毫秒默认100
*/ */
private final ExecutorService workerThreadPool; private long delayQueueTimeout = 100;
/** /**
* 轮询delayQueue获取过期任务线程 * 轮询delayQueue获取过期任务线程
*/ */
private final ExecutorService bossThreadPool; private ExecutorService bossThreadPool;
/** /**
* 构造函数 * 构造
* @param timeout 超时时长
*/ */
public SystemTimer(int timeout) { public SystemTimer() {
timeWheel = new TimingWheel(1, 20, System.currentTimeMillis(), delayQueue); timeWheel = new TimingWheel(1, 20, delayQueue::offer);
workerThreadPool = ThreadUtil.newExecutor(100); }
/**
* 设置执行队列取元素超时时长,单位毫秒
* @param delayQueueTimeout 执行队列取元素超时时长,单位毫秒
* @return this
*/
public SystemTimer setDelayQueueTimeout(long delayQueueTimeout){
this.delayQueueTimeout = delayQueueTimeout;
return this;
}
/**
* 启动,异步
*
* @return this
*/
public SystemTimer start() {
bossThreadPool = ThreadUtil.newSingleExecutor(); bossThreadPool = ThreadUtil.newSingleExecutor();
bossThreadPool.submit(() -> { bossThreadPool.submit(() -> {
while (true) { while (true) {
this.advanceClock(timeout); if(false == advanceClock()){
break;
}
} }
}); });
return this;
}
/**
* 强制结束
*/
public void stop(){
this.bossThreadPool.shutdown();
} }
/** /**
@@ -50,25 +78,39 @@ public class SystemTimer {
*/ */
public void addTask(TimerTask timerTask) { public void addTask(TimerTask timerTask) {
//添加失败任务直接执行 //添加失败任务直接执行
if (!timeWheel.addTask(timerTask)) { if (false == timeWheel.addTask(timerTask)) {
workerThreadPool.submit(timerTask.getTask()); ThreadUtil.execAsync(timerTask.getTask());
} }
} }
/** /**
* 获取过期任务 * 指针前进并获取过期任务
*
* @return 是否结束
*/ */
private void advanceClock(long timeout) { private boolean advanceClock() {
try { try {
TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); TimerTaskList timerTaskList = poll();
if (timerTaskList != null) { if (null != timerTaskList) {
//推进时间 //推进时间
timeWheel.advanceClock(timerTaskList.getExpire()); timeWheel.advanceClock(timerTaskList.getExpire());
//执行过期任务(包含降级操作) //执行过期任务(包含降级操作)
timerTaskList.flush(this::addTask); timerTaskList.flush(this::addTask);
} }
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
// ignore return false;
} }
return true;
}
/**
* 执行队列取任务列表
* @return 任务列表
* @throws InterruptedException 中断异常
*/
private TimerTaskList poll() throws InterruptedException {
return this.delayQueueTimeout > 0 ?
delayQueue.poll(delayQueueTimeout, TimeUnit.MILLISECONDS) :
delayQueue.poll();
} }
} }

View File

@@ -2,7 +2,7 @@ package cn.hutool.cron.timingwheel;
import cn.hutool.log.StaticLog; import cn.hutool.log.StaticLog;
import java.util.concurrent.DelayQueue; import java.util.function.Consumer;
/** /**
* 多层时间轮,常用于延时任务。<br> * 多层时间轮,常用于延时任务。<br>
@@ -44,9 +44,20 @@ public class TimingWheel {
private volatile TimingWheel overflowWheel; private volatile TimingWheel overflowWheel;
/** /**
* 执行等待列表 * 任务处理器
*/ */
private final DelayQueue<TimerTaskList> delayQueue; private final Consumer<TimerTaskList> consumer;
/**
* 构造
*
* @param tickMs 一个时间槽的范围,单位毫秒
* @param wheelSize 时间轮大小
* @param consumer 任务处理器
*/
public TimingWheel(long tickMs, int wheelSize, Consumer<TimerTaskList> consumer) {
this(tickMs, wheelSize, System.currentTimeMillis(), consumer);
}
/** /**
* 构造 * 构造
@@ -54,9 +65,9 @@ public class TimingWheel {
* @param tickMs 一个时间槽的范围,单位毫秒 * @param tickMs 一个时间槽的范围,单位毫秒
* @param wheelSize 时间轮大小 * @param wheelSize 时间轮大小
* @param currentTime 当前时间 * @param currentTime 当前时间
* @param delayQueue 执行等待链表 * @param consumer 任务处理器
*/ */
public TimingWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) { public TimingWheel(long tickMs, int wheelSize, long currentTime, Consumer<TimerTaskList> consumer) {
this.currentTime = currentTime; this.currentTime = currentTime;
this.tickMs = tickMs; this.tickMs = tickMs;
this.wheelSize = wheelSize; this.wheelSize = wheelSize;
@@ -64,7 +75,7 @@ public class TimingWheel {
this.timerTaskLists = new TimerTaskList[wheelSize]; this.timerTaskLists = new TimerTaskList[wheelSize];
//currentTime为tickMs的整数倍 这里做取整操作 //currentTime为tickMs的整数倍 这里做取整操作
this.currentTime = currentTime - (currentTime % tickMs); this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue; this.consumer = consumer;
} }
/** /**
@@ -91,7 +102,7 @@ public class TimingWheel {
timerTaskList.addTask(timerTask); timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) { if (timerTaskList.setExpiration(virtualId * tickMs)) {
//添加到delayQueue中 //添加到delayQueue中
delayQueue.offer(timerTaskList); consumer.accept(timerTaskList);
} }
} else { } else {
//放到上一层的时间轮 //放到上一层的时间轮
@@ -122,7 +133,7 @@ public class TimingWheel {
if (overflowWheel == null) { if (overflowWheel == null) {
synchronized (this) { synchronized (this) {
if (overflowWheel == null) { if (overflowWheel == null) {
overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue); overflowWheel = new TimingWheel(interval, wheelSize, currentTime, consumer);
} }
} }
} }