diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java index f6a3dc20f..a2aba2eb8 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java @@ -10,9 +10,12 @@ import java.util.stream.Stream; /** * 可召回批处理线程池执行器 + *
  * 1.数据分批并行处理
- * 2.线程安全,可用同时执行多个任务
- * 3.主线程、线程池混合执行,主线程空闲时会尝试召回线程池队列中的任务执行,无需担心任务阻塞
+ * 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
+ * 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
+ * 4.适合批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积
+ * 
* * @author likuan */ @@ -25,8 +28,10 @@ public class RecyclableBatchThreadPoolExecutor { /** * 建议的构造方法 - * 使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略 - * 假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化 + *
+	 * 1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略
+	 * 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
+	 * 
* * @param poolSize 线程池大小 * @param threadPoolPrefix 线程名前缀 @@ -61,11 +66,22 @@ public class RecyclableBatchThreadPoolExecutor { executor.shutdown(); } + /** + * 获取线程池 + * @return ExecutorService + */ + public ExecutorService getExecutor(){ + return executor; + } + /** * 分批次处理数据 + *
 	 * 1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果{@link Function}返回null即可
-	 * 2.异常在{@link Function}中自行处理
-	 * 3.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+	 * 2.{@link Function}需自行处理异常、保证线程安全
+	 * 3.原始数据在分片后可能被外部修改,导致批次数据不一致,如有必要,传参之前进行数据拷贝
+	 * 4.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+	 * 
* * @param 输入数据类型 * @param 输出数据类型 @@ -81,10 +97,11 @@ public class RecyclableBatchThreadPoolExecutor { } List> batches = splitData(data, batchSize); int batchCount = batches.size(); - ConcurrentLinkedQueue> taskQueue = new ConcurrentLinkedQueue<>(); + int minusOne = batchCount - 1; + ArrayDeque> taskQueue = new ArrayDeque<>(minusOne); Map>> futuresMap = new HashMap<>(); // 提交前 batchCount-1 批任务 - for (int i = 0 ; i < batchCount-1 ; i++) { + for (int i = 0 ; i < minusOne ; i++) { final int index = i; IdempotentTask task = new IdempotentTask<>(i,() -> processBatch(batches.get(index), processor)); taskQueue.add(task); @@ -92,7 +109,7 @@ public class RecyclableBatchThreadPoolExecutor { } Object[] arr = new Object[batchCount]; // 处理最后一批 - arr[batchCount-1] = processBatch(batches.get(batchCount-1), processor); + arr[minusOne] = processBatch(batches.get(minusOne), processor); // 处理剩余任务 processRemainingTasks(taskQueue, futuresMap,arr); //排序、过滤null