This commit is contained in:
Looly
2020-02-21 18:56:42 +08:00
parent fc8dfc8fb1
commit 0553c5ca0b
12 changed files with 278 additions and 99 deletions

View File

@@ -42,7 +42,8 @@ public class FileTypeUtil {
fileTypeMap.put("255044462d312e", "pdf"); // Adobe Acrobat (pdf)
fileTypeMap.put("2e524d46000000120001", "rmvb"); // rmvb/rm相同
fileTypeMap.put("464c5601050000000900", "flv"); // flv与f4v相同
fileTypeMap.put("00000020667479706d70", "mp4");
fileTypeMap.put("00000020667479706", "mp4");
fileTypeMap.put("00000018667479706D70", "mp4");
fileTypeMap.put("49443303000000002176", "mp3");
fileTypeMap.put("000001ba210001000180", "mpg"); //
fileTypeMap.put("3026b2758e66cf11a6d9", "wmv"); // wmv与asf相同

View File

@@ -0,0 +1,88 @@
package cn.hutool.core.thread;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* ExecutorService代理
*
* @author loolly
*/
public class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) {
e = executor;
}
@SuppressWarnings("NullableProblems")
public void execute(Runnable command) {
e.execute(command);
}
public void shutdown() {
e.shutdown();
}
@SuppressWarnings("NullableProblems")
public List<Runnable> shutdownNow() {
return e.shutdownNow();
}
public boolean isShutdown() {
return e.isShutdown();
}
public boolean isTerminated() {
return e.isTerminated();
}
@SuppressWarnings("NullableProblems")
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
@SuppressWarnings("NullableProblems")
public Future<?> submit(Runnable task) {
return e.submit(task);
}
@SuppressWarnings("NullableProblems")
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
@SuppressWarnings("NullableProblems")
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
@SuppressWarnings("NullableProblems")
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return e.invokeAll(tasks);
}
@SuppressWarnings("NullableProblems")
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
@SuppressWarnings("NullableProblems")
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

View File

@@ -1,6 +1,8 @@
package cn.hutool.core.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
@@ -14,31 +16,48 @@ import cn.hutool.core.util.ObjectUtil;
/**
* {@link ThreadPoolExecutor} 建造者
*
*
* @author looly
* @since 4.1.9
*/
public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
private static final long serialVersionUID = 1L;
/** 初始池大小 */
/** 默认的等待队列容量 */
public static final int DEFAULT_QUEUE_CAPACITY = 1024;
/**
* 初始池大小
*/
private int corePoolSize;
/** 最大池大小(允许同时执行的最大线程数) */
/**
* 最大池大小(允许同时执行的最大线程数)
*/
private int maxPoolSize = Integer.MAX_VALUE;
/** 线程存活时间,即当池中线程多于初始大小时,多出的线程保留的时长 */
/**
* 线程存活时间,即当池中线程多于初始大小时,多出的线程保留的时长
*/
private long keepAliveTime = TimeUnit.SECONDS.toNanos(60);
/** 队列,用于存在未执行的线程 */
/**
* 队列,用于存在未执行的线程
*/
private BlockingQueue<Runnable> workQueue;
/** 线程工厂,用于自定义线程创建 */
/**
* 线程工厂,用于自定义线程创建
*/
private ThreadFactory threadFactory;
/** 当线程阻塞block时的异常处理器所谓线程阻塞即线程池和等待队列已满无法处理线程时采取的策略 */
/**
* 当线程阻塞block时的异常处理器所谓线程阻塞即线程池和等待队列已满无法处理线程时采取的策略
*/
private RejectedExecutionHandler handler;
/** 线程执行超时后是否回收线程 */
/**
* 线程执行超时后是否回收线程
*/
private Boolean allowCoreThreadTimeOut;
/**
* 设置初始池大小默认0
*
*
* @param corePoolSize 初始池大小
* @return this
*/
@@ -49,7 +68,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 设置最大池大小(允许同时执行的最大线程数)
*
*
* @param maxPoolSize 最大池大小(允许同时执行的最大线程数)
* @return this
*/
@@ -60,9 +79,9 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 设置线程存活时间,即当池中线程多于初始大小时,多出的线程保留的时长
*
*
* @param keepAliveTime 线程存活时间
* @param unit 单位
* @param unit 单位
* @return this
*/
public ExecutorBuilder setKeepAliveTime(long keepAliveTime, TimeUnit unit) {
@@ -71,7 +90,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 设置线程存活时间,即当池中线程多于初始大小时,多出的线程保留的时长,单位纳秒
*
*
* @param keepAliveTime 线程存活时间,单位纳秒
* @return this
*/
@@ -83,13 +102,14 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 设置队列,用于存在未执行的线程<br>
* 可选队列有:
*
*
* <pre>
* 1. SynchronousQueue 它将任务直接提交给线程而不保持它们。当运行线程小于maxPoolSize时会创建新线程否则触发异常策略
* 2. LinkedBlockingQueue 无界队列当运行线程大于corePoolSize时始终放入此队列此时maximumPoolSize无效
* 3. ArrayBlockingQueue 有界队列,相对无界队列有利于控制队列大小队列满时运行线程小于maxPoolSize时会创建新线程否则触发异常策略
* 1. {@link SynchronousQueue} 它将任务直接提交给线程而不保持它们。当运行线程小于maxPoolSize时会创建新线程否则触发异常策略
* 2. {@link LinkedBlockingQueue} 默认无界队列当运行线程大于corePoolSize时始终放入此队列此时maximumPoolSize无效
* 当构造LinkedBlockingQueue对象时传入参数变为有界队列队列满时运行线程小于maxPoolSize时会创建新线程否则触发异常策略
* 3. {@link ArrayBlockingQueue} 有界队列相对无界队列有利于控制队列大小队列满时运行线程小于maxPoolSize时会创建新线程否则触发异常策略
* </pre>
*
*
* @param workQueue 队列
* @return this
*/
@@ -98,10 +118,22 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
return this;
}
/**
* 使用{@link ArrayBlockingQueue} 做为等待队列<br>
* 有界队列相对无界队列有利于控制队列大小队列满时运行线程小于maxPoolSize时会创建新线程否则触发异常策略
*
* @param capacity 队列容量
* @return this
* @since 5.1.4
*/
public ExecutorBuilder useArrayBlockingQueue(int capacity) {
return setWorkQueue(new ArrayBlockingQueue<>(capacity));
}
/**
* 使用{@link SynchronousQueue} 做为等待队列(非公平策略)<br>
* 它将任务直接提交给线程而不保持它们。当运行线程小于maxPoolSize时会创建新线程否则触发异常策略
*
*
* @return this
* @since 4.1.11
*/
@@ -112,7 +144,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 使用{@link SynchronousQueue} 做为等待队列<br>
* 它将任务直接提交给线程而不保持它们。当运行线程小于maxPoolSize时会创建新线程否则触发异常策略
*
*
* @param fair 是否使用公平访问策略
* @return this
* @since 4.5.0
@@ -123,7 +155,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 设置线程工厂,用于自定义线程创建
*
*
* @param threadFactory 线程工厂
* @return this
* @see ThreadFactoryBuilder
@@ -137,7 +169,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
* 设置当线程阻塞block时的异常处理器所谓线程阻塞即线程池和等待队列已满无法处理线程时采取的策略
* <p>
* 此处可以使用JDK预定义的几种策略见{@link RejectPolicy}枚举
*
*
* @param handler {@link RejectedExecutionHandler}
* @return this
* @see RejectPolicy
@@ -149,7 +181,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 设置线程执行超时后是否回收线程
*
*
* @param allowCoreThreadTimeOut 线程执行超时后是否回收线程
* @return this
*/
@@ -160,7 +192,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
/**
* 创建ExecutorBuilder开始构建
*
*
* @return {@link ExecutorBuilder}
*/
public static ExecutorBuilder create() {
@@ -175,9 +207,19 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
return build(this);
}
/**
* 创建有回收关闭功能的ExecutorService
*
* @return 创建有回收关闭功能的ExecutorService
* @since 5.1.4
*/
public ExecutorService buildFinalizable() {
return new FinalizableDelegatedExecutorService(build());
}
/**
* 构建ThreadPoolExecutor
*
*
* @param builder {@link ExecutorBuilder}
* @return {@link ThreadPoolExecutor}
*/
@@ -190,7 +232,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
workQueue = builder.workQueue;
} else {
// corePoolSize为0则要使用SynchronousQueue避免无限阻塞
workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>();
workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
}
final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory();
RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, new ThreadPoolExecutor.AbortPolicy());

View File

@@ -0,0 +1,19 @@
package cn.hutool.core.thread;
import java.util.concurrent.ExecutorService;
/**
* 保证ExecutorService在对象回收时正常结束
*
* @author loolly
*/
public class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
@Override
protected void finalize() {
super.shutdown();
}
}

View File

@@ -6,7 +6,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -48,7 +47,11 @@ public class ThreadUtil {
* @return ExecutorService
*/
public static ExecutorService newSingleExecutor() {
return Executors.newSingleThreadExecutor();
return ExecutorBuilder.create()//
.setCorePoolSize(1)//
.setMaxPoolSize(1)//
.setKeepAliveTime(0)//
.buildFinalizable();
}
/**