diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java
new file mode 100644
index 000000000..97b55ccbc
--- /dev/null
+++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java
@@ -0,0 +1,303 @@
+package cn.hutool.core.thread;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * 可召回批处理线程池执行器
+ *
+ * 1.数据分批并行处理
+ * 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
+ * 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
+ *
+ *
+ * 适用场景:
+ *
+ * 1.批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积 {@link #process(List, int, Function)}
+ * 2.普通查询接口加速 {@link #processByWarp(Warp[])}
+ *
+ *
+ * @author likuan
+ */
+public class RecyclableBatchThreadPoolExecutor {
+
+ private final ExecutorService executor;
+
+ public RecyclableBatchThreadPoolExecutor(int poolSize){
+ this(poolSize,"recyclable-batch-pool-");
+ }
+
+ /**
+ * 建议的构造方法
+ *
+ * 1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略
+ * 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
+ *
+ *
+ * @param poolSize 线程池大小
+ * @param threadPoolPrefix 线程名前缀
+ */
+ public RecyclableBatchThreadPoolExecutor(int poolSize, String threadPoolPrefix){
+ AtomicInteger threadNumber = new AtomicInteger(1);
+ ThreadFactory threadFactory = r -> {
+ Thread t = new Thread(r, threadPoolPrefix + threadNumber.getAndIncrement());
+ if (t.isDaemon()) {
+ t.setDaemon(false);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ };
+ this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),threadFactory);
+ }
+
+ /**
+ * 自定义线程池,一般不需要使用
+ * @param executor 线程池
+ */
+ public RecyclableBatchThreadPoolExecutor(ExecutorService executor){
+ this.executor = executor;
+ }
+
+ /**
+ * 关闭线程池
+ */
+ public void shutdown(){
+ executor.shutdown();
+ }
+
+ /**
+ * 获取线程池
+ * @return ExecutorService
+ */
+ public ExecutorService getExecutor(){
+ return executor;
+ }
+
+ /**
+ * 分批次处理数据
+ *
+ * 1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果{@link Function}返回null即可
+ * 2.{@link Function}需自行处理异常、保证线程安全
+ * 3.原始数据在分片后可能被外部修改,导致批次数据不一致,如有必要,传参之前进行数据拷贝
+ * 4.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+ *
+ *
+ * @param 输入数据类型
+ * @param 输出数据类型
+ * @param data 待处理数据集合
+ * @param batchSize 每批次数据量
+ * @param processor 单条数据处理函数
+ * @return 处理结果集合
+ */
+ public List process(List data, int batchSize, Function processor) {
+ if (batchSize < 1) {
+ throw new IllegalArgumentException("batchSize >= 1");
+ }
+ List> batches = splitData(data, batchSize);
+ int batchCount = batches.size();
+ int minusOne = batchCount - 1;
+ ArrayDeque> taskQueue = new ArrayDeque<>(minusOne);
+ Map>> futuresMap = new HashMap<>();
+ // 提交前 batchCount-1 批任务
+ for (int i = 0 ; i < minusOne ; i++) {
+ final int index = i;
+ IdempotentTask task = new IdempotentTask<>(i,() -> processBatch(batches.get(index), processor));
+ taskQueue.add(task);
+ futuresMap.put(i,executor.submit(task));
+ }
+ @SuppressWarnings("unchecked")
+ List[] resultArr = new ArrayList[batchCount];
+ // 处理最后一批
+ resultArr[minusOne] = processBatch(batches.get(minusOne), processor);
+ // 处理剩余任务
+ processRemainingTasks(taskQueue, futuresMap,resultArr);
+ //排序、过滤null
+ return Stream.of(resultArr)
+ .filter(Objects::nonNull)
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * 处理剩余任务并收集结果
+ * @param taskQueue 任务队列
+ * @param futuresMap 异步任务映射
+ * @param resultArr 结果存储数组
+ */
+ private void processRemainingTasks(Queue> taskQueue, Map>> futuresMap, List[] resultArr) {
+ // 主消费未执行任务
+ IdempotentTask task;
+ while ((task = taskQueue.poll()) != null) {
+ try {
+ TaskResult call = task.call();
+ if (call.effective) {
+ // 取消被主线程执行任务
+ Future> future = futuresMap.remove(task.index);
+ future.cancel(false);
+ //加入结果集
+ resultArr[task.index] = call.result;
+ }
+ } catch (Exception e) {
+ // 不处理异常
+ throw new RuntimeException(e);
+ }
+ }
+ futuresMap.forEach((index,future)->{
+ try {
+ TaskResult taskResult = future.get();
+ if(taskResult.effective){
+ resultArr[index] = taskResult.result;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * 幂等任务包装类,确保任务只执行一次
+ */
+ private static class IdempotentTask implements Callable> {
+
+ private final int index;
+ private final Callable> delegate;
+ private final AtomicBoolean executed = new AtomicBoolean(false);
+
+ IdempotentTask(int index,Callable> delegate) {
+ this.index = index;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public TaskResult call() throws Exception {
+ if (executed.compareAndSet(false, true)) {
+ return new TaskResult<>(delegate.call(), true);
+ }
+ return new TaskResult<>(null, false);
+ }
+ }
+
+ /**
+ * 结果包装类,标记结果有效性
+ */
+ private static class TaskResult{
+ private final List result;
+ private final boolean effective;
+ TaskResult(List result, boolean effective){
+ this.result = result;
+ this.effective = effective;
+ }
+ }
+
+ /**
+ * 数据分片方法
+ * @param data 原始数据
+ * @param batchSize 每批次数据量
+ * @return 分片后的二维集合
+ */
+ private static List> splitData(List data, int batchSize) {
+ int batchCount = (data.size() + batchSize - 1) / batchSize;
+ return new AbstractList>() {
+ @Override
+ public List get(int index) {
+ int from = index * batchSize;
+ int to = Math.min((index + 1) * batchSize, data.size());
+ return data.subList(from, to);
+ }
+
+ @Override
+ public int size() {
+ return batchCount;
+ }
+ };
+ }
+
+ /**
+ * 单批次数据处理
+ * @param batch 单批次数据
+ * @param processor 处理函数
+ * @return 处理结果
+ */
+ private static List processBatch(List batch, Function processor) {
+ return batch.stream().map(processor).filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ /**
+ * 处理Warp数组
+ *
+ * {@code
+ * Warp warp1 = Warp.of(this::select1);
+ * Warp> warp2 = Warp.of(this::select2);
+ * executor.processByWarp(warp1, warp2);
+ * String r1 = warp1.get();
+ * List r2 = warp2.get();
+ * }
+ *
+ * @param warps Warp数组
+ * @return Warp集合,此方法返回结果为空的不会被过滤
+ */
+ public List> processByWarp(Warp>... warps) {
+ return processByWarp(Arrays.asList(warps));
+ }
+
+ /**
+ * 处理Warp集合
+ * @param warps Warp集合
+ * @return Warp集合,此方法返回结果为空的不会被过滤
+ */
+ public List> processByWarp(List> warps) {
+ return process(warps, 1, Warp::execute);
+ }
+
+ /**
+ * 处理逻辑包装类
+ * @param 结果类型
+ */
+ public static class Warp{
+
+ private Warp(Supplier supplier){
+ Objects.requireNonNull(supplier);
+ this.supplier = supplier;
+ }
+
+ /**
+ * 创建Warp
+ * @param supplier 执行逻辑
+ * @return Warp
+ * @param 结果类型
+ */
+ public static Warp of(Supplier supplier){
+ return new Warp<>(supplier);
+ }
+
+ private final Supplier supplier;
+ private R result;
+
+ /**
+ * 获取结果
+ * @return 结果
+ */
+ public R get() {
+ return result;
+ }
+
+ /**
+ * 执行
+ * @return this
+ */
+ public Warp execute() {
+ result = supplier.get();
+ return this;
+ }
+
+ }
+
+}
diff --git a/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java
new file mode 100644
index 000000000..d8dc2f182
--- /dev/null
+++ b/hutool-core/src/test/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutorTest.java
@@ -0,0 +1,115 @@
+package cn.hutool.core.thread;
+
+import cn.hutool.core.thread.RecyclableBatchThreadPoolExecutor.Warp;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
+/**
+ * {@link RecyclableBatchThreadPoolExecutor} 测试类
+ */
+public class RecyclableBatchThreadPoolExecutorTest {
+
+
+ /**
+ * 批量处理数据
+ * @throws InterruptedException
+ */
+ @Test
+ public void test() throws InterruptedException {
+ int corePoolSize = 10;// 线程池大小
+ int batchSize = 100;// 每批次数据量
+ int clientCount = 30;// 调用者数量
+ test(corePoolSize,batchSize,clientCount);
+ }
+
+ /**
+ * 普通查询接口加速
+ */
+ @Test
+ public void test2() {
+ RecyclableBatchThreadPoolExecutor executor = new RecyclableBatchThreadPoolExecutor(10);
+ long s = System.nanoTime();
+ Warp warp1 = Warp.of(this::select1);
+ Warp> warp2 = Warp.of(this::select2);
+ executor.processByWarp(warp1, warp2);
+ Map map = new HashMap<>();
+ map.put("key1",warp1.get());
+ map.put("key2",warp2.get());
+ long d = System.nanoTime() - s;
+ System.out.printf("总耗时:%.2f秒%n",d/1e9);
+ System.out.println(map);
+ }
+
+ public void test(int corePoolSize,int batchSize,int clientCount ) throws InterruptedException{
+ RecyclableBatchThreadPoolExecutor processor = new RecyclableBatchThreadPoolExecutor(corePoolSize);
+ // 模拟多个调用者线程提交任务
+ ExecutorService testExecutor = Executors.newFixedThreadPool(clientCount);
+ Map> map = new HashMap<>();
+ for(int i = 0; i < clientCount; i++){
+ map.put(i,testDate(1000));
+ }
+ long s = System.nanoTime();
+ List> futures = new ArrayList<>();
+ for (int j = 0; j < clientCount; j++) {
+ final int clientId = j;
+ Future> submit = testExecutor.submit(() -> {
+ Function function = p -> {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Thread.currentThread().getName() + "#" + p;
+ };
+ long start = System.nanoTime();
+ List process = processor.process(map.get(clientId), batchSize, function);
+ long duration = System.nanoTime() - start;
+ System.out.printf("【clientId:%s】处理结果:%s\n处理耗时:%.2f秒%n", clientId, process, duration / 1e9);
+ });
+ futures.add(submit);
+ }
+ futures.forEach(p-> {
+ try {
+ p.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ long d = System.nanoTime() - s;
+ System.out.printf("总耗时:%.2f秒%n",d/1e9);
+ testExecutor.shutdown();
+ processor.shutdown();
+ }
+ public static List testDate(int count){
+ List list = new ArrayList<>();
+ for(int i = 1;i<=count;i++){
+ list.add(i);
+ }
+ return list;
+ }
+
+ private String select1() {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return "1";
+ }
+
+ private List select2() {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Arrays.asList("1","2","3");
+ }
+
+}