From 7f2f97353c5e0156eff5db438e9ba448f0e70574 Mon Sep 17 00:00:00 2001 From: Looly Date: Wed, 12 May 2021 02:10:02 +0800 Subject: [PATCH] fix method --- .../hutool/cron/timingwheel/SystemTimer.java | 76 ++++++++++++++----- .../hutool/cron/timingwheel/TimingWheel.java | 27 +++++-- 2 files changed, 78 insertions(+), 25 deletions(-) diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java index 29bf5948e..00bb1e835 100644 --- a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/SystemTimer.java @@ -7,7 +7,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** - * 系统计时器, + * 系统计时器 + * + * @author eliasyaoyc, looly */ public class SystemTimer { /** @@ -21,28 +23,54 @@ public class SystemTimer { private final DelayQueue delayQueue = new DelayQueue<>(); /** - * 过期任务执行线程 + * 执行队列取元素超时时长,单位毫秒,默认100 */ - private final ExecutorService workerThreadPool; + private long delayQueueTimeout = 100; /** * 轮询delayQueue获取过期任务线程 */ - private final ExecutorService bossThreadPool; + private ExecutorService bossThreadPool; /** - * 构造函数 - * @param timeout 超时时长 + * 构造 */ - public SystemTimer(int timeout) { - timeWheel = new TimingWheel(1, 20, System.currentTimeMillis(), delayQueue); - workerThreadPool = ThreadUtil.newExecutor(100); + public SystemTimer() { + timeWheel = new TimingWheel(1, 20, delayQueue::offer); + } + + /** + * 设置执行队列取元素超时时长,单位毫秒 + * @param delayQueueTimeout 执行队列取元素超时时长,单位毫秒 + * @return this + */ + public SystemTimer setDelayQueueTimeout(long delayQueueTimeout){ + this.delayQueueTimeout = delayQueueTimeout; + return this; + } + + /** + * 启动,异步 + * + * @return this + */ + public SystemTimer start() { bossThreadPool = ThreadUtil.newSingleExecutor(); bossThreadPool.submit(() -> { while (true) { - this.advanceClock(timeout); + if(false == advanceClock()){ + break; + } } }); + return this; + } + + /** + * 强制结束 + */ + public void stop(){ + this.bossThreadPool.shutdown(); } /** @@ -50,25 +78,39 @@ public class SystemTimer { */ public void addTask(TimerTask timerTask) { //添加失败任务直接执行 - if (!timeWheel.addTask(timerTask)) { - workerThreadPool.submit(timerTask.getTask()); + if (false == timeWheel.addTask(timerTask)) { + ThreadUtil.execAsync(timerTask.getTask()); } } /** - * 获取过期任务 + * 指针前进并获取过期任务 + * + * @return 是否结束 */ - private void advanceClock(long timeout) { + private boolean advanceClock() { try { - TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); - if (timerTaskList != null) { + TimerTaskList timerTaskList = poll(); + if (null != timerTaskList) { //推进时间 timeWheel.advanceClock(timerTaskList.getExpire()); //执行过期任务(包含降级操作) timerTaskList.flush(this::addTask); } } catch (InterruptedException ignore) { - // ignore + return false; } + return true; + } + + /** + * 执行队列取任务列表 + * @return 任务列表 + * @throws InterruptedException 中断异常 + */ + private TimerTaskList poll() throws InterruptedException { + return this.delayQueueTimeout > 0 ? + delayQueue.poll(delayQueueTimeout, TimeUnit.MILLISECONDS) : + delayQueue.poll(); } } diff --git a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java index 7b53e2990..9ea7c0287 100644 --- a/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java +++ b/hutool-cron/src/main/java/cn/hutool/cron/timingwheel/TimingWheel.java @@ -2,7 +2,7 @@ package cn.hutool.cron.timingwheel; import cn.hutool.log.StaticLog; -import java.util.concurrent.DelayQueue; +import java.util.function.Consumer; /** * 多层时间轮,常用于延时任务。
@@ -44,9 +44,20 @@ public class TimingWheel { private volatile TimingWheel overflowWheel; /** - * 执行等待列表 + * 任务处理器 */ - private final DelayQueue delayQueue; + private final Consumer consumer; + + /** + * 构造 + * + * @param tickMs 一个时间槽的范围,单位毫秒 + * @param wheelSize 时间轮大小 + * @param consumer 任务处理器 + */ + public TimingWheel(long tickMs, int wheelSize, Consumer consumer) { + this(tickMs, wheelSize, System.currentTimeMillis(), consumer); + } /** * 构造 @@ -54,9 +65,9 @@ public class TimingWheel { * @param tickMs 一个时间槽的范围,单位毫秒 * @param wheelSize 时间轮大小 * @param currentTime 当前时间 - * @param delayQueue 执行等待链表 + * @param consumer 任务处理器 */ - public TimingWheel(long tickMs, int wheelSize, long currentTime, DelayQueue delayQueue) { + public TimingWheel(long tickMs, int wheelSize, long currentTime, Consumer consumer) { this.currentTime = currentTime; this.tickMs = tickMs; this.wheelSize = wheelSize; @@ -64,7 +75,7 @@ public class TimingWheel { this.timerTaskLists = new TimerTaskList[wheelSize]; //currentTime为tickMs的整数倍 这里做取整操作 this.currentTime = currentTime - (currentTime % tickMs); - this.delayQueue = delayQueue; + this.consumer = consumer; } /** @@ -91,7 +102,7 @@ public class TimingWheel { timerTaskList.addTask(timerTask); if (timerTaskList.setExpiration(virtualId * tickMs)) { //添加到delayQueue中 - delayQueue.offer(timerTaskList); + consumer.accept(timerTaskList); } } else { //放到上一层的时间轮 @@ -122,7 +133,7 @@ public class TimingWheel { if (overflowWheel == null) { synchronized (this) { if (overflowWheel == null) { - overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue); + overflowWheel = new TimingWheel(interval, wheelSize, currentTime, consumer); } } }