diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java
index a2aba2eb8..df16985c2 100644
--- a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java
+++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java
@@ -5,6 +5,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -14,12 +15,18 @@ import java.util.stream.Stream;
* 1.数据分批并行处理
* 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
* 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
- * 4.适合批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积
+ *
+ *
+ * 适用场景:
+ *
+ * 1.批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积 {@link #process(List, int, Function)}
+ * 2.普通查询接口加速 {@link #processByWarp(Warp[])}
*
*
* @author likuan
*/
public class RecyclableBatchThreadPoolExecutor {
+
private final ExecutorService executor;
public RecyclableBatchThreadPoolExecutor(int poolSize){
@@ -90,7 +97,6 @@ public class RecyclableBatchThreadPoolExecutor {
* @param processor 单条数据处理函数
* @return 处理结果集合
*/
- @SuppressWarnings("unchecked")
public List process(List data, int batchSize, Function processor) {
if (batchSize < 1) {
throw new IllegalArgumentException("batchSize >= 1");
@@ -99,7 +105,7 @@ public class RecyclableBatchThreadPoolExecutor {
int batchCount = batches.size();
int minusOne = batchCount - 1;
ArrayDeque> taskQueue = new ArrayDeque<>(minusOne);
- Map>> futuresMap = new HashMap<>();
+ Map>> futuresMap = new HashMap<>();
// 提交前 batchCount-1 批任务
for (int i = 0 ; i < minusOne ; i++) {
final int index = i;
@@ -107,15 +113,15 @@ public class RecyclableBatchThreadPoolExecutor {
taskQueue.add(task);
futuresMap.put(i,executor.submit(task));
}
- Object[] arr = new Object[batchCount];
+ @SuppressWarnings("unchecked")
+ List[] resultArr = new ArrayList[batchCount];
// 处理最后一批
- arr[minusOne] = processBatch(batches.get(minusOne), processor);
+ resultArr[minusOne] = processBatch(batches.get(minusOne), processor);
// 处理剩余任务
- processRemainingTasks(taskQueue, futuresMap,arr);
+ processRemainingTasks(taskQueue, futuresMap,resultArr);
//排序、过滤null
- return Stream.of(arr)
+ return Stream.of(resultArr)
.filter(Objects::nonNull)
- .map(p -> (List) p)
.flatMap(List::stream)
.collect(Collectors.toList());
}
@@ -124,20 +130,20 @@ public class RecyclableBatchThreadPoolExecutor {
* 处理剩余任务并收集结果
* @param taskQueue 任务队列
* @param futuresMap 异步任务映射
- * @param arr 结果存储数组
+ * @param resultArr 结果存储数组
*/
- private void processRemainingTasks(Queue> taskQueue, Map>> futuresMap,Object[] arr) {
+ private void processRemainingTasks(Queue> taskQueue, Map>> futuresMap, List[] resultArr) {
// 主消费未执行任务
IdempotentTask task;
while ((task = taskQueue.poll()) != null) {
try {
- ResultWarp call = task.call();
+ TaskResult call = task.call();
if (call.effective) {
// 取消被主线程执行任务
- Future> future = futuresMap.remove(task.index);
+ Future> future = futuresMap.remove(task.index);
future.cancel(false);
//加入结果集
- arr[task.index] = call.result;
+ resultArr[task.index] = call.result;
}
} catch (Exception e) {
// 不处理异常
@@ -146,9 +152,9 @@ public class RecyclableBatchThreadPoolExecutor {
}
futuresMap.forEach((index,future)->{
try {
- ResultWarp resultWarp = future.get();
- if(resultWarp.effective){
- arr[index] = resultWarp.result;
+ TaskResult taskResult = future.get();
+ if(taskResult.effective){
+ resultArr[index] = taskResult.result;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
@@ -159,7 +165,7 @@ public class RecyclableBatchThreadPoolExecutor {
/**
* 幂等任务包装类,确保任务只执行一次
*/
- private static class IdempotentTask implements Callable> {
+ private static class IdempotentTask implements Callable> {
private final int index;
private final Callable> delegate;
@@ -171,21 +177,21 @@ public class RecyclableBatchThreadPoolExecutor {
}
@Override
- public ResultWarp call() throws Exception {
+ public TaskResult call() throws Exception {
if (executed.compareAndSet(false, true)) {
- return new ResultWarp<>(delegate.call(), true);
+ return new TaskResult<>(delegate.call(), true);
}
- return new ResultWarp<>(null, false);
+ return new TaskResult<>(null, false);
}
}
/**
* 结果包装类,标记结果有效性
*/
- private static class ResultWarp{
+ private static class TaskResult{
private final List result;
private final boolean effective;
- ResultWarp(List result, boolean effective){
+ TaskResult(List result, boolean effective){
this.result = result;
this.effective = effective;
}
@@ -197,7 +203,7 @@ public class RecyclableBatchThreadPoolExecutor {
* @param batchSize 每批次数据量
* @return 分片后的二维集合
*/
- public static List> splitData(List data, int batchSize) {
+ private static List> splitData(List data, int batchSize) {
int batchCount = (data.size() + batchSize - 1) / batchSize;
return new AbstractList>() {
@Override
@@ -220,8 +226,69 @@ public class RecyclableBatchThreadPoolExecutor {
* @param processor 处理函数
* @return 处理结果
*/
- public static List processBatch(List batch, Function processor) {
+ private static List processBatch(List batch, Function processor) {
return batch.stream().map(processor).filter(Objects::nonNull).collect(Collectors.toList());
}
+ /**
+ * 处理Warp集合
+ *
+ * {@code
+ * Warp warp1 = Warp.of(this::select1);
+ * Warp> warp2 = Warp.of(this::select2);
+ * executor.processByWarp(warp1, warp2);
+ * String r1 = warp1.get();
+ * List r2 = warp2.get();
+ * }
+ *
+ * @param warps Warp集合
+ * @return Warp集合,此方法返回结果为空的不会被过滤
+ */
+ public List> processByWarp(Warp>... warps) {
+ return process(Arrays.asList(warps), 1, Warp::execute);
+ }
+
+ /**
+ * 处理逻辑包装类
+ * @param 结果类型
+ */
+ public static class Warp{
+
+ private Warp(Supplier supplier){
+ Objects.requireNonNull(supplier);
+ this.supplier = supplier;
+ }
+
+ /**
+ * 创建Warp
+ * @param supplier 执行逻辑
+ * @return Warp
+ * @param 结果类型
+ */
+ public static Warp of(Supplier supplier){
+ return new Warp<>(supplier);
+ }
+
+ private final Supplier supplier;
+ private R result;
+
+ /**
+ * 获取结果
+ * @return 结果
+ */
+ public R get() {
+ return result;
+ }
+
+ /**
+ * 执行
+ * @return this
+ */
+ public Warp execute() {
+ result = supplier.get();
+ return this;
+ }
+
+ }
+
}
diff --git a/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java
index 3fde81eac..d8dc2f182 100644
--- a/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java
+++ b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java
@@ -1,11 +1,9 @@
package cn.hutool.core.thread;
+import cn.hutool.core.thread.RecyclableBatchThreadPoolExecutor.Warp;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -17,6 +15,11 @@ import java.util.function.Function;
*/
public class RecyclableBatchThreadPoolExecutorTest {
+
+ /**
+ * 批量处理数据
+ * @throws InterruptedException
+ */
@Test
public void test() throws InterruptedException {
int corePoolSize = 10;// 线程池大小
@@ -25,6 +28,24 @@ public class RecyclableBatchThreadPoolExecutorTest {
test(corePoolSize,batchSize,clientCount);
}
+ /**
+ * 普通查询接口加速
+ */
+ @Test
+ public void test2() {
+ RecyclableBatchThreadPoolExecutor executor = new RecyclableBatchThreadPoolExecutor(10);
+ long s = System.nanoTime();
+ Warp warp1 = Warp.of(this::select1);
+ Warp> warp2 = Warp.of(this::select2);
+ executor.processByWarp(warp1, warp2);
+ Map map = new HashMap<>();
+ map.put("key1",warp1.get());
+ map.put("key2",warp2.get());
+ long d = System.nanoTime() - s;
+ System.out.printf("总耗时:%.2f秒%n",d/1e9);
+ System.out.println(map);
+ }
+
public void test(int corePoolSize,int batchSize,int clientCount ) throws InterruptedException{
RecyclableBatchThreadPoolExecutor processor = new RecyclableBatchThreadPoolExecutor(corePoolSize);
// 模拟多个调用者线程提交任务
@@ -73,4 +94,22 @@ public class RecyclableBatchThreadPoolExecutorTest {
return list;
}
+ private String select1() {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return "1";
+ }
+
+ private List select2() {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Arrays.asList("1","2","3");
+ }
+
}