Java 线程池详解
深入理解线程池原理、参数配置与并发性能优化
目录
为什么使用线程池
创建和销毁线程是有开销的。线程池通过复用线程,减少了线程创建和销毁的开销,提高了系统性能。
优点:
- 降低资源消耗
- 提高响应速度
- 便于线程管理
- 提高系统稳定性
线程池核心参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)参数详解
| 参数 | 说明 |
|---|---|
| corePoolSize | 核心线程数,即使空闲也保留 |
| maximumPoolSize | 最大线程数,队列满时创建 |
| keepAliveTime | 非核心线程空闲存活时间 |
| workQueue | 任务等待队列 |
| threadFactory | 创建线程的工厂 |
| handler | 任务拒绝策略 |
创建线程池
方式一:Executors 工厂方法(不推荐)
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);
// 缓存线程池(可动态调整)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);为什么不推荐? Executors 创建的线程池有潜在风险:
newFixedThreadPool和newSingleThreadExecutor使用无界队列,可能导致 OOMnewCachedThreadPool允许无限创建线程,可能导致资源耗尽
方式二:ThreadPoolExecutor 手动创建(推荐)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 有界队列,容量100
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "custom-pool-" + count.incrementAndGet());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);方式三:ThreadPoolTaskExecutor(Spring)
@Configuration
public class ThreadPoolConfig {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}任务队列
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| ArrayBlockingQueue | 有界数组队列 | 需要限制队列大小 |
| LinkedBlockingQueue | 有界/无界链表队列 | 最常用,灵活控制 |
| SynchronousQueue | 不存储元素 | 直接提交给线程 |
| PriorityBlockingQueue | 优先级队列 | 任务有优先级 |
拒绝策略
// 1. AbortPolicy(默认)- 抛出异常
new ThreadPoolExecutor.AbortPolicy();
// 2. CallerRunsPolicy - 由调用线程执行
new ThreadPoolExecutor.CallerRunsPolicy();
// 3. DiscardPolicy - 静默丢弃
new ThreadPoolExecutor.DiscardPolicy();
// 4. DiscardOldestPolicy - 丢弃最老任务
new ThreadPoolExecutor.DiscardOldestPolicy();
// 5. 自定义拒绝策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志、持久化到数据库等
System.out.println("任务被拒绝: " + r.toString());
}
};提交任务
// 1. execute() - 无返回值
executor.execute(() -> {
System.out.println("执行任务");
});
// 2. submit() - 返回 Future
Future<Integer> future = executor.submit(() -> {
return 42;
});
Integer result = future.get(); // 阻塞等待结果
// 3. invokeAll() - 批量提交,等待所有完成
List<Callable<Integer>> tasks = Arrays.asList(
() -> 1, () -> 2, () -> 3
);
List<Future<Integer>> futures = executor.invokeAll(tasks);
// 4. invokeAny() - 批量提交,返回最先完成的
Integer anyResult = executor.invokeAny(tasks);关闭线程池
// 优雅关闭
public void shutdownGracefully(ExecutorService executor) {
// 1. 停止接受新任务
executor.shutdown();
try {
// 2. 等待现有任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 3. 超时后强制关闭
executor.shutdownNow();
// 4. 再次等待
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}线程池监控
public void printThreadPoolStatus(ThreadPoolExecutor executor) {
System.out.println("线程池状态:");
System.out.println(" 核心线程数: " + executor.getCorePoolSize());
System.out.println(" 最大线程数: " + executor.getMaximumPoolSize());
System.out.println(" 当前线程数: " + executor.getPoolSize());
System.out.println(" 活跃线程数: " + executor.getActiveCount());
System.out.println(" 任务队列大小: " + executor.getQueue().size());
System.out.println(" 已完成任务数: " + executor.getCompletedTaskCount());
System.out.println(" 总任务数: " + executor.getTaskCount());
}最佳实践
1. 合理设置线程池大小
// CPU 密集型:核心数 + 1
int cpuCorePoolSize = Runtime.getRuntime().availableProcessors() + 1;
// IO 密集型:核心数 * 2 或根据公式计算
int ioCorePoolSize = Runtime.getRuntime().availableProcessors() * 2;
// 更精确的计算
// 线程数 = CPU核心数 / (1 - 阻塞系数) 阻塞系数:0.8~0.92. 使用有界队列
// 推荐:使用 ArrayBlockingQueue 或指定容量的 LinkedBlockingQueue
new LinkedBlockingQueue<>(1000) // 明确指定容量3. 自定义线程工厂
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("custom-pool-%d")
.setDaemon(false)
.setPriority(Thread.NORM_PRIORITY)
.build();4. 包装 Runnable/Callable
// 包装任务以记录执行时间
public class TimingTask implements Runnable {
private final Runnable task;
@Override
public void run() {
long start = System.currentTimeMillis();
try {
task.run();
} finally {
long duration = System.currentTimeMillis() - start;
System.out.println("任务执行时间: " + duration + "ms");
}
}
}5. 使用 CompletableFuture(Java 8+)
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture.supplyAsync(() -> fetchData(), executor)
.thenApply(data -> processData(data))
.thenAccept(result -> saveResult(result));常见问题
1. 线程池死锁
// 错误示例:在任务内提交并等待新任务
executor.submit(() -> {
Future<Integer> future = executor.submit(() -> 1);
return future.get(); // 可能导致死锁
});2. 异常处理
// 使用包装类捕获异常
executor.execute(() -> {
try {
// 业务逻辑
} catch (Exception e) {
// 记录日志
log.error("任务执行异常", e);
}
});总结
线程池是 Java 并发编程的核心工具,正确配置和使用线程池可以显著提升系统性能。记住:
- 使用
ThreadPoolExecutor手动创建线程池 - 使用有界队列防止 OOM
- 设置合理的拒绝策略
- 正确关闭线程池
- 监控线程池状态