From 8580250ce87cbe3731991be175b4dac59fcd88d6 Mon Sep 17 00:00:00 2001 From: likuan Date: Tue, 20 May 2025 15:50:40 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=8F=AF=E5=8F=AC=E5=9B=9E=E6=89=B9?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RecyclableBatchThreadPoolExecutor.java | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java new file mode 100644 index 000000000..f6a3dc20f --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java @@ -0,0 +1,210 @@ +package cn.hutool.core.thread; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * 可召回批处理线程池执行器 + * 1.数据分批并行处理 + * 2.线程安全,可用同时执行多个任务 + * 3.主线程、线程池混合执行,主线程空闲时会尝试召回线程池队列中的任务执行,无需担心任务阻塞 + * + * @author likuan + */ +public class RecyclableBatchThreadPoolExecutor { + private final ExecutorService executor; + + public RecyclableBatchThreadPoolExecutor(int poolSize){ + this(poolSize,"recyclable-batch-pool-"); + } + + /** + * 建议的构造方法 + * 使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略 + * 假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化 + * + * @param poolSize 线程池大小 + * @param threadPoolPrefix 线程名前缀 + */ + public RecyclableBatchThreadPoolExecutor(int poolSize, String threadPoolPrefix){ + AtomicInteger threadNumber = new AtomicInteger(1); + ThreadFactory threadFactory = r -> { + Thread t = new Thread(r, threadPoolPrefix + threadNumber.getAndIncrement()); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + }; + this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),threadFactory); + } + + /** + * 自定义线程池,一般不需要使用 + * @param executor 线程池 + */ + public RecyclableBatchThreadPoolExecutor(ExecutorService executor){ + this.executor = executor; + } + + /** + * 关闭线程池 + */ + public void shutdown(){ + executor.shutdown(); + } + + /** + * 分批次处理数据 + * 1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果{@link Function}返回null即可 + * 2.异常在{@link Function}中自行处理 + * 3.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池 + * + * @param 输入数据类型 + * @param 输出数据类型 + * @param data 待处理数据集合 + * @param batchSize 每批次数据量 + * @param processor 单条数据处理函数 + * @return 处理结果集合 + */ + @SuppressWarnings("unchecked") + public List process(List data, int batchSize, Function processor) { + if (batchSize < 1) { + throw new IllegalArgumentException("batchSize >= 1"); + } + List> batches = splitData(data, batchSize); + int batchCount = batches.size(); + ConcurrentLinkedQueue> taskQueue = new ConcurrentLinkedQueue<>(); + Map>> futuresMap = new HashMap<>(); + // 提交前 batchCount-1 批任务 + for (int i = 0 ; i < batchCount-1 ; i++) { + final int index = i; + IdempotentTask task = new IdempotentTask<>(i,() -> processBatch(batches.get(index), processor)); + taskQueue.add(task); + futuresMap.put(i,executor.submit(task)); + } + Object[] arr = new Object[batchCount]; + // 处理最后一批 + arr[batchCount-1] = processBatch(batches.get(batchCount-1), processor); + // 处理剩余任务 + processRemainingTasks(taskQueue, futuresMap,arr); + //排序、过滤null + return Stream.of(arr) + .filter(Objects::nonNull) + .map(p -> (List) p) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + /** + * 处理剩余任务并收集结果 + * @param taskQueue 任务队列 + * @param futuresMap 异步任务映射 + * @param arr 结果存储数组 + */ + private void processRemainingTasks(Queue> taskQueue, Map>> futuresMap,Object[] arr) { + // 主消费未执行任务 + IdempotentTask task; + while ((task = taskQueue.poll()) != null) { + try { + ResultWarp call = task.call(); + if (call.effective) { + // 取消被主线程执行任务 + Future> future = futuresMap.remove(task.index); + future.cancel(false); + //加入结果集 + arr[task.index] = call.result; + } + } catch (Exception e) { + // 不处理异常 + throw new RuntimeException(e); + } + } + futuresMap.forEach((index,future)->{ + try { + ResultWarp resultWarp = future.get(); + if(resultWarp.effective){ + arr[index] = resultWarp.result; + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * 幂等任务包装类,确保任务只执行一次 + */ + private static class IdempotentTask implements Callable> { + + private final int index; + private final Callable> delegate; + private final AtomicBoolean executed = new AtomicBoolean(false); + + IdempotentTask(int index,Callable> delegate) { + this.index = index; + this.delegate = delegate; + } + + @Override + public ResultWarp call() throws Exception { + if (executed.compareAndSet(false, true)) { + return new ResultWarp<>(delegate.call(), true); + } + return new ResultWarp<>(null, false); + } + } + + /** + * 结果包装类,标记结果有效性 + */ + private static class ResultWarp{ + private final List result; + private final boolean effective; + ResultWarp(List result, boolean effective){ + this.result = result; + this.effective = effective; + } + } + + /** + * 数据分片方法 + * @param data 原始数据 + * @param batchSize 每批次数据量 + * @return 分片后的二维集合 + */ + public static List> splitData(List data, int batchSize) { + int batchCount = (data.size() + batchSize - 1) / batchSize; + return new AbstractList>() { + @Override + public List get(int index) { + int from = index * batchSize; + int to = Math.min((index + 1) * batchSize, data.size()); + return data.subList(from, to); + } + + @Override + public int size() { + return batchCount; + } + }; + } + + /** + * 单批次数据处理 + * @param batch 单批次数据 + * @param processor 处理函数 + * @return 处理结果 + */ + public static List processBatch(List batch, Function processor) { + return batch.stream().map(processor).filter(Objects::nonNull).collect(Collectors.toList()); + } + +} From 2d7a64f660f8fb23cf5a06493d60e5665935c63b Mon Sep 17 00:00:00 2001 From: likuan Date: Tue, 20 May 2025 16:29:38 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=8F=AF=E5=8F=AC=E5=9B=9E=E6=89=B9?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8=E6=B5=8B=E8=AF=95=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...RecyclableBatchThreadPoolExecutorTest.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java diff --git a/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java new file mode 100644 index 000000000..3fde81eac --- /dev/null +++ b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java @@ -0,0 +1,76 @@ +package cn.hutool.core.thread; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; + +/** + * {@link RecyclableBatchThreadPoolExecutor} 测试类 + */ +public class RecyclableBatchThreadPoolExecutorTest { + + @Test + public void test() throws InterruptedException { + int corePoolSize = 10;// 线程池大小 + int batchSize = 100;// 每批次数据量 + int clientCount = 30;// 调用者数量 + test(corePoolSize,batchSize,clientCount); + } + + public void test(int corePoolSize,int batchSize,int clientCount ) throws InterruptedException{ + RecyclableBatchThreadPoolExecutor processor = new RecyclableBatchThreadPoolExecutor(corePoolSize); + // 模拟多个调用者线程提交任务 + ExecutorService testExecutor = Executors.newFixedThreadPool(clientCount); + Map> map = new HashMap<>(); + for(int i = 0; i < clientCount; i++){ + map.put(i,testDate(1000)); + } + long s = System.nanoTime(); + List> futures = new ArrayList<>(); + for (int j = 0; j < clientCount; j++) { + final int clientId = j; + Future submit = testExecutor.submit(() -> { + Function function = p -> { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Thread.currentThread().getName() + "#" + p; + }; + long start = System.nanoTime(); + List process = processor.process(map.get(clientId), batchSize, function); + long duration = System.nanoTime() - start; + System.out.printf("【clientId:%s】处理结果:%s\n处理耗时:%.2f秒%n", clientId, process, duration / 1e9); + }); + futures.add(submit); + } + futures.forEach(p-> { + try { + p.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + long d = System.nanoTime() - s; + System.out.printf("总耗时:%.2f秒%n",d/1e9); + testExecutor.shutdown(); + processor.shutdown(); + } + public static List testDate(int count){ + List list = new ArrayList<>(); + for(int i = 1;i<=count;i++){ + list.add(i); + } + return list; + } + +} From d1988d4db9568eab3394785e2f3e380d941de3cc Mon Sep 17 00:00:00 2001 From: likuan Date: Wed, 21 May 2025 13:46:51 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=8F=AF=E5=8F=AC=E5=9B=9E=E6=89=B9?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8=EF=BC=8C=E6=9B=B4=E6=8D=A2=E4=BB=BB=E5=8A=A1=E9=98=9F?= =?UTF-8?q?=E5=88=97=EF=BC=8C=E5=AE=8C=E5=96=84=E6=8E=A5=E5=8F=A3=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RecyclableBatchThreadPoolExecutor.java | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java index f6a3dc20f..a2aba2eb8 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java @@ -10,9 +10,12 @@ import java.util.stream.Stream; /** * 可召回批处理线程池执行器 + *
  * 1.数据分批并行处理
- * 2.线程安全,可用同时执行多个任务
- * 3.主线程、线程池混合执行,主线程空闲时会尝试召回线程池队列中的任务执行,无需担心任务阻塞
+ * 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
+ * 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
+ * 4.适合批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积
+ * 
* * @author likuan */ @@ -25,8 +28,10 @@ public class RecyclableBatchThreadPoolExecutor { /** * 建议的构造方法 - * 使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略 - * 假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化 + *
+	 * 1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略
+	 * 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
+	 * 
* * @param poolSize 线程池大小 * @param threadPoolPrefix 线程名前缀 @@ -61,11 +66,22 @@ public class RecyclableBatchThreadPoolExecutor { executor.shutdown(); } + /** + * 获取线程池 + * @return ExecutorService + */ + public ExecutorService getExecutor(){ + return executor; + } + /** * 分批次处理数据 + *
 	 * 1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果{@link Function}返回null即可
-	 * 2.异常在{@link Function}中自行处理
-	 * 3.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+	 * 2.{@link Function}需自行处理异常、保证线程安全
+	 * 3.原始数据在分片后可能被外部修改,导致批次数据不一致,如有必要,传参之前进行数据拷贝
+	 * 4.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+	 * 
* * @param 输入数据类型 * @param 输出数据类型 @@ -81,10 +97,11 @@ public class RecyclableBatchThreadPoolExecutor { } List> batches = splitData(data, batchSize); int batchCount = batches.size(); - ConcurrentLinkedQueue> taskQueue = new ConcurrentLinkedQueue<>(); + int minusOne = batchCount - 1; + ArrayDeque> taskQueue = new ArrayDeque<>(minusOne); Map>> futuresMap = new HashMap<>(); // 提交前 batchCount-1 批任务 - for (int i = 0 ; i < batchCount-1 ; i++) { + for (int i = 0 ; i < minusOne ; i++) { final int index = i; IdempotentTask task = new IdempotentTask<>(i,() -> processBatch(batches.get(index), processor)); taskQueue.add(task); @@ -92,7 +109,7 @@ public class RecyclableBatchThreadPoolExecutor { } Object[] arr = new Object[batchCount]; // 处理最后一批 - arr[batchCount-1] = processBatch(batches.get(batchCount-1), processor); + arr[minusOne] = processBatch(batches.get(minusOne), processor); // 处理剩余任务 processRemainingTasks(taskQueue, futuresMap,arr); //排序、过滤null From 7a2ef283ff44576286e72df8cedc54d0674a0da0 Mon Sep 17 00:00:00 2001 From: likuan Date: Thu, 22 May 2025 14:44:46 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E5=8F=AF=E5=8F=AC=E5=9B=9E=E6=89=B9?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E5=8C=85=E8=A3=85=E7=B1=BB=EF=BC=8C=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=8C=85=E8=A3=85=E7=B1=BB=E5=A4=84=E7=90=86=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RecyclableBatchThreadPoolExecutor.java | 115 ++++++++++++++---- ...RecyclableBatchThreadPoolExecutorTest.java | 47 ++++++- 2 files changed, 134 insertions(+), 28 deletions(-) diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java index a2aba2eb8..df16985c2 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java @@ -5,6 +5,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -14,12 +15,18 @@ import java.util.stream.Stream; * 1.数据分批并行处理 * 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行 * 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可 - * 4.适合批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积 + * + * + * 适用场景: + *
+ * 1.批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积 {@link #process(List, int, Function)}
+ * 2.普通查询接口加速 {@link #processByWarp(Warp[])}
  * 
* * @author likuan */ public class RecyclableBatchThreadPoolExecutor { + private final ExecutorService executor; public RecyclableBatchThreadPoolExecutor(int poolSize){ @@ -90,7 +97,6 @@ public class RecyclableBatchThreadPoolExecutor { * @param processor 单条数据处理函数 * @return 处理结果集合 */ - @SuppressWarnings("unchecked") public List process(List data, int batchSize, Function processor) { if (batchSize < 1) { throw new IllegalArgumentException("batchSize >= 1"); @@ -99,7 +105,7 @@ public class RecyclableBatchThreadPoolExecutor { int batchCount = batches.size(); int minusOne = batchCount - 1; ArrayDeque> taskQueue = new ArrayDeque<>(minusOne); - Map>> futuresMap = new HashMap<>(); + Map>> futuresMap = new HashMap<>(); // 提交前 batchCount-1 批任务 for (int i = 0 ; i < minusOne ; i++) { final int index = i; @@ -107,15 +113,15 @@ public class RecyclableBatchThreadPoolExecutor { taskQueue.add(task); futuresMap.put(i,executor.submit(task)); } - Object[] arr = new Object[batchCount]; + @SuppressWarnings("unchecked") + List[] resultArr = new ArrayList[batchCount]; // 处理最后一批 - arr[minusOne] = processBatch(batches.get(minusOne), processor); + resultArr[minusOne] = processBatch(batches.get(minusOne), processor); // 处理剩余任务 - processRemainingTasks(taskQueue, futuresMap,arr); + processRemainingTasks(taskQueue, futuresMap,resultArr); //排序、过滤null - return Stream.of(arr) + return Stream.of(resultArr) .filter(Objects::nonNull) - .map(p -> (List) p) .flatMap(List::stream) .collect(Collectors.toList()); } @@ -124,20 +130,20 @@ public class RecyclableBatchThreadPoolExecutor { * 处理剩余任务并收集结果 * @param taskQueue 任务队列 * @param futuresMap 异步任务映射 - * @param arr 结果存储数组 + * @param resultArr 结果存储数组 */ - private void processRemainingTasks(Queue> taskQueue, Map>> futuresMap,Object[] arr) { + private void processRemainingTasks(Queue> taskQueue, Map>> futuresMap, List[] resultArr) { // 主消费未执行任务 IdempotentTask task; while ((task = taskQueue.poll()) != null) { try { - ResultWarp call = task.call(); + TaskResult call = task.call(); if (call.effective) { // 取消被主线程执行任务 - Future> future = futuresMap.remove(task.index); + Future> future = futuresMap.remove(task.index); future.cancel(false); //加入结果集 - arr[task.index] = call.result; + resultArr[task.index] = call.result; } } catch (Exception e) { // 不处理异常 @@ -146,9 +152,9 @@ public class RecyclableBatchThreadPoolExecutor { } futuresMap.forEach((index,future)->{ try { - ResultWarp resultWarp = future.get(); - if(resultWarp.effective){ - arr[index] = resultWarp.result; + TaskResult taskResult = future.get(); + if(taskResult.effective){ + resultArr[index] = taskResult.result; } } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); @@ -159,7 +165,7 @@ public class RecyclableBatchThreadPoolExecutor { /** * 幂等任务包装类,确保任务只执行一次 */ - private static class IdempotentTask implements Callable> { + private static class IdempotentTask implements Callable> { private final int index; private final Callable> delegate; @@ -171,21 +177,21 @@ public class RecyclableBatchThreadPoolExecutor { } @Override - public ResultWarp call() throws Exception { + public TaskResult call() throws Exception { if (executed.compareAndSet(false, true)) { - return new ResultWarp<>(delegate.call(), true); + return new TaskResult<>(delegate.call(), true); } - return new ResultWarp<>(null, false); + return new TaskResult<>(null, false); } } /** * 结果包装类,标记结果有效性 */ - private static class ResultWarp{ + private static class TaskResult{ private final List result; private final boolean effective; - ResultWarp(List result, boolean effective){ + TaskResult(List result, boolean effective){ this.result = result; this.effective = effective; } @@ -197,7 +203,7 @@ public class RecyclableBatchThreadPoolExecutor { * @param batchSize 每批次数据量 * @return 分片后的二维集合 */ - public static List> splitData(List data, int batchSize) { + private static List> splitData(List data, int batchSize) { int batchCount = (data.size() + batchSize - 1) / batchSize; return new AbstractList>() { @Override @@ -220,8 +226,69 @@ public class RecyclableBatchThreadPoolExecutor { * @param processor 处理函数 * @return 处理结果 */ - public static List processBatch(List batch, Function processor) { + private static List processBatch(List batch, Function processor) { return batch.stream().map(processor).filter(Objects::nonNull).collect(Collectors.toList()); } + /** + * 处理Warp集合 + * + *
{@code
+	 * Warp warp1 = Warp.of(this::select1);
+	 * Warp> warp2 = Warp.of(this::select2);
+	 * executor.processByWarp(warp1, warp2);
+	 * String r1 = warp1.get();
+	 * List r2 = warp2.get();
+	 * }
+ * + * @param warps Warp集合 + * @return Warp集合,此方法返回结果为空的不会被过滤 + */ + public List> processByWarp(Warp... warps) { + return process(Arrays.asList(warps), 1, Warp::execute); + } + + /** + * 处理逻辑包装类 + * @param 结果类型 + */ + public static class Warp{ + + private Warp(Supplier supplier){ + Objects.requireNonNull(supplier); + this.supplier = supplier; + } + + /** + * 创建Warp + * @param supplier 执行逻辑 + * @return Warp + * @param 结果类型 + */ + public static Warp of(Supplier supplier){ + return new Warp<>(supplier); + } + + private final Supplier supplier; + private R result; + + /** + * 获取结果 + * @return 结果 + */ + public R get() { + return result; + } + + /** + * 执行 + * @return this + */ + public Warp execute() { + result = supplier.get(); + return this; + } + + } + } diff --git a/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java index 3fde81eac..d8dc2f182 100644 --- a/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java +++ b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java @@ -1,11 +1,9 @@ package cn.hutool.core.thread; +import cn.hutool.core.thread.RecyclableBatchThreadPoolExecutor.Warp; import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -17,6 +15,11 @@ import java.util.function.Function; */ public class RecyclableBatchThreadPoolExecutorTest { + + /** + * 批量处理数据 + * @throws InterruptedException + */ @Test public void test() throws InterruptedException { int corePoolSize = 10;// 线程池大小 @@ -25,6 +28,24 @@ public class RecyclableBatchThreadPoolExecutorTest { test(corePoolSize,batchSize,clientCount); } + /** + * 普通查询接口加速 + */ + @Test + public void test2() { + RecyclableBatchThreadPoolExecutor executor = new RecyclableBatchThreadPoolExecutor(10); + long s = System.nanoTime(); + Warp warp1 = Warp.of(this::select1); + Warp> warp2 = Warp.of(this::select2); + executor.processByWarp(warp1, warp2); + Map map = new HashMap<>(); + map.put("key1",warp1.get()); + map.put("key2",warp2.get()); + long d = System.nanoTime() - s; + System.out.printf("总耗时:%.2f秒%n",d/1e9); + System.out.println(map); + } + public void test(int corePoolSize,int batchSize,int clientCount ) throws InterruptedException{ RecyclableBatchThreadPoolExecutor processor = new RecyclableBatchThreadPoolExecutor(corePoolSize); // 模拟多个调用者线程提交任务 @@ -73,4 +94,22 @@ public class RecyclableBatchThreadPoolExecutorTest { return list; } + private String select1() { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "1"; + } + + private List select2() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Arrays.asList("1","2","3"); + } + } From 1f2dc4fd3aa132260550b43285a612a99027bd35 Mon Sep 17 00:00:00 2001 From: likuan Date: Thu, 22 May 2025 15:04:55 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=8F=AF=E5=8F=AC=E5=9B=9E=E6=89=B9?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=8C=85=E8=A3=85=E7=B1=BB?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../thread/RecyclableBatchThreadPoolExecutor.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java index df16985c2..97b55ccbc 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java @@ -231,7 +231,7 @@ public class RecyclableBatchThreadPoolExecutor { } /** - * 处理Warp集合 + * 处理Warp数组 * *
{@code
 	 * Warp warp1 = Warp.of(this::select1);
@@ -241,11 +241,20 @@ public class RecyclableBatchThreadPoolExecutor {
 	 * List r2 = warp2.get();
 	 * }
* - * @param warps Warp集合 + * @param warps Warp数组 * @return Warp集合,此方法返回结果为空的不会被过滤 */ public List> processByWarp(Warp... warps) { - return process(Arrays.asList(warps), 1, Warp::execute); + return processByWarp(Arrays.asList(warps)); + } + + /** + * 处理Warp集合 + * @param warps Warp集合 + * @return Warp集合,此方法返回结果为空的不会被过滤 + */ + public List> processByWarp(List> warps) { + return process(warps, 1, Warp::execute); } /**