diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java b/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java index 3fac35e1d..724b9daa8 100755 --- a/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/BlockPolicy.java @@ -10,6 +10,8 @@ import java.util.function.Consumer; * 如果阻塞过程中被中断,就会抛出{@link InterruptedException}异常
* 有时候在线程池内访问第三方接口,只希望固定并发数去访问,并且不希望丢弃任务时使用此策略,队列满的时候会处于阻塞状态(例如刷库的场景) * + * 其他系统内置的拒绝策略,见hutool定义的枚举 {@link RejectPolicy} 线程拒绝策略枚举. + * * @author luozongle * @since 5.8.0 */ diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java b/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java index 0fdb8f2ce..65806b8ce 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RejectPolicy.java @@ -20,7 +20,7 @@ public enum RejectPolicy { DISCARD(new ThreadPoolExecutor.DiscardPolicy()), /** 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) */ DISCARD_OLDEST(new ThreadPoolExecutor.DiscardOldestPolicy()), - /** 由主线程来直接执行 */ + /** 调用者线程来执行被丢弃的任务;一般可能是由主线程来直接执行 */ CALLER_RUNS(new ThreadPoolExecutor.CallerRunsPolicy()), /** 当任务队列过长时处于阻塞状态,直到添加到队列中,固定并发数去访问,并且不希望丢弃任务时使用此策略 */ BLOCK(new BlockPolicy()); diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java index 4e9621a11..c244ff5b6 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java @@ -4,6 +4,8 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -272,11 +274,116 @@ public class ThreadUtil { /** * 新建一个CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 * - * @param threadCount 线程数量 + * 示例:等6个同学都离开教室,班长才能锁门。 + *
{@code
+	 * CountDownLatch latch = new CountDownLatch(6); // 总共任务是6
+	 *  for (int x = 0; x < 6; x++) {
+	 *   //具体生产任务,可以用线程池替换
+	 *     new Thread(()->{
+	 *         try {
+	 *           //每个同学在教室待上几秒秒钟
+	 *             int time = ThreadLocalRandom.current().nextInt(1, 5);
+	 *             TimeUnit.SECONDS.sleep(time);
+	 *         } catch (InterruptedException e) {
+	 *             e.printStackTrace();
+	 *         }
+	 *         System.out.printf("同学【%s】,已经离开了教室%n", Thread.currentThread().getName());
+	 *         latch.countDown(); // 减1(每离开一个同学,减去1),必须执行,可以放到 try...finally中
+	 *     },"同学 - " + x).start();
+	 * }
+	 * latch.await(); // 等待计数为0后再解除阻塞;(等待所有同学离开)
+	 * System.out.println("【主线程】所有同学都离开了教室,开始锁教室大门了。");
+	 * }
+	 * 
+ * + * 该示例,也可以用:{@link Phaser} 移相器 进行实现 + * + * @param taskCount 任务数量 * @return CountDownLatch */ - public static CountDownLatch newCountDownLatch(final int threadCount) { - return new CountDownLatch(threadCount); + public static CountDownLatch newCountDownLatch(final int taskCount) { + return new CountDownLatch(taskCount); + } + + /** + * 新建一个CycleBarrier 循环栅栏,一个同步辅助类 + * + * 示例:7个同学,集齐7个龙珠,7个同学一起召唤神龙;前后集齐了2次 + *
{@code
+	 *         AtomicInteger times = new AtomicInteger();
+	 *         CyclicBarrier barrier = new CyclicBarrier(7, ()->{
+	 *             System.out.println("");
+	 *             System.out.println("");
+	 *             System.out.println("【循环栅栏业务处理】7个子线程 都收集了一颗龙珠,七颗龙珠已经收集齐全,开始召唤神龙。" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+	 *             times.getAndIncrement();
+	 *         }); // 现在设置的栅栏的数量为2
+	 *         for (int x = 0; x < 7; x++) {   // 创建7个线程, 当然也可以使用线程池替换。
+	 *             new Thread(() -> {
+	 *                 while (times.get() < 2) {
+	 *                     try {
+	 *                         System.out.printf("【Barrier - 收集龙珠】当前的线程名称:%s%n", Thread.currentThread().getName());
+	 *                         int time = ThreadLocalRandom.current().nextInt(1, 10); // 等待一段时间,模拟线程的执行时间
+	 *                         TimeUnit.SECONDS.sleep(time); // 模拟业务延迟,收集龙珠的时间
+	 *                         barrier.await(); // 等待,凑够了7个等待的线程
+	 *                         System.err.printf("〖Barrier - 举起龙珠召唤神龙〗当前的线程名称:%s\t%s%n", Thread.currentThread().getName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+	 *                         if (barrier.getParties() >= 7) {
+	 *                             barrier.reset(); // 重置栅栏,等待下一次的召唤。
+	 *                         }
+	 *                     } catch (Exception e) {
+	 *                         e.printStackTrace();
+	 *                     }
+	 *                 }
+	 *             }, "线程 - " + x).start();
+	 *         }
+	 * }
+ * + * 该示例,也可以用:{@link Phaser} 移相器 进行实现 + * @param taskCount 任务数量 + * @return 循环栅栏 + */ + public static CyclicBarrier newCyclicBarrier(final int taskCount) { + return new CyclicBarrier(taskCount); + } + + /** + * 新建一个Phaser,一个同步辅助类,jdk1.7提供,可以完全替代CountDownLatch; + * @since 6.0.1 + * @author dazer + * + * Pharser: 移相器、相位器,可重用同步屏障; + * 功能可以替换:{@link CyclicBarrier}(固定线程)循环栅栏、{@link CountDownLatch}(固定计数)倒数计数、加上分层功能 + * + * 示例1:等6个同学都离开教室,班长才能锁门。 + *
{@code
+	 * Phaser phaser = new Phaser(6); // 总共任务是6
+	 *  for (int x = 0; x < 6; x++) {
+	 *   //具体生产任务,可以用线程池替换
+	 *     new Thread(()->{
+	 *         try {
+	 *             //每个同学在教室待上几秒秒钟
+	 *           	int time = ThreadLocalRandom.current().nextInt(1, 5);
+	 *             TimeUnit.SECONDS.sleep(5);
+	 *         } catch (InterruptedException e) {
+	 *             e.printStackTrace();
+	 *         }
+	 *         System.out.printf("同学【%s】,已经离开了教室%n", Thread.currentThread().getName());
+	 *         phaser.arrive(); // 减1 等价于countDown()方法(每离开一个同学,减去1),必须执行,可以放到 try...finally中
+	 *     },"同学 - " + x).start();
+	 * }
+	 * phaser.awaitAdvance(phaser.getPhase()); // 等价于latch.await()方法 等待计数为0后再解除阻塞;(等待所有同学离开)
+	 * System.out.println("【主线程】所有同学都离开了教室,开始锁教室大门了。");
+	 * }
+	 * 
+ * + * 示例2:7个同学,集齐7个龙珠,7个同学一起召唤神龙; + * 只需要:phaser.arrive(); --> phaser.arriveAndAwaitAdvance() //等待其他的线程就位 + * 该示例,也可以用:{@link CyclicBarrier} 进行实现 + * + * @param taskCount 任务数量 + * @return Phaser + */ + public static Phaser newPhaser(final int taskCount) { + return new Phaser(taskCount); } /**