目录

Java 线程池详解

深入理解线程池原理、参数配置与并发性能优化

为什么使用线程池

创建和销毁线程是有开销的。线程池通过复用线程,减少了线程创建和销毁的开销,提高了系统性能。

优点:

  • 降低资源消耗
  • 提高响应速度
  • 便于线程管理
  • 提高系统稳定性

线程池核心参数

public ThreadPoolExecutor(
    int corePoolSize,           // 核心线程数
    int maximumPoolSize,        // 最大线程数
    long keepAliveTime,         // 空闲线程存活时间
    TimeUnit unit,              // 时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory,        // 线程工厂
    RejectedExecutionHandler handler    // 拒绝策略
)

参数详解

参数说明
corePoolSize核心线程数,即使空闲也保留
maximumPoolSize最大线程数,队列满时创建
keepAliveTime非核心线程空闲存活时间
workQueue任务等待队列
threadFactory创建线程的工厂
handler任务拒绝策略

创建线程池

方式一:Executors 工厂方法(不推荐)

// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);

// 缓存线程池(可动态调整)
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);

为什么不推荐? Executors 创建的线程池有潜在风险:

  • newFixedThreadPoolnewSingleThreadExecutor 使用无界队列,可能导致 OOM
  • newCachedThreadPool 允许无限创建线程,可能导致资源耗尽

方式二:ThreadPoolExecutor 手动创建(推荐)

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                          // 核心线程数
    10,                         // 最大线程数
    60L,                        // 空闲线程存活时间
    TimeUnit.SECONDS,           // 时间单位
    new LinkedBlockingQueue<>(100),  // 有界队列,容量100
    new ThreadFactory() {
        private final AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "custom-pool-" + count.incrementAndGet());
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

方式三:ThreadPoolTaskExecutor(Spring)

@Configuration
public class ThreadPoolConfig {
    
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

任务队列

队列类型特点适用场景
ArrayBlockingQueue有界数组队列需要限制队列大小
LinkedBlockingQueue有界/无界链表队列最常用,灵活控制
SynchronousQueue不存储元素直接提交给线程
PriorityBlockingQueue优先级队列任务有优先级

拒绝策略

// 1. AbortPolicy(默认)- 抛出异常
new ThreadPoolExecutor.AbortPolicy();

// 2. CallerRunsPolicy - 由调用线程执行
new ThreadPoolExecutor.CallerRunsPolicy();

// 3. DiscardPolicy - 静默丢弃
new ThreadPoolExecutor.DiscardPolicy();

// 4. DiscardOldestPolicy - 丢弃最老任务
new ThreadPoolExecutor.DiscardOldestPolicy();

// 5. 自定义拒绝策略
new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志、持久化到数据库等
        System.out.println("任务被拒绝: " + r.toString());
    }
};

提交任务

// 1. execute() - 无返回值
executor.execute(() -> {
    System.out.println("执行任务");
});

// 2. submit() - 返回 Future
Future<Integer> future = executor.submit(() -> {
    return 42;
});
Integer result = future.get(); // 阻塞等待结果

// 3. invokeAll() - 批量提交,等待所有完成
List<Callable<Integer>> tasks = Arrays.asList(
    () -> 1, () -> 2, () -> 3
);
List<Future<Integer>> futures = executor.invokeAll(tasks);

// 4. invokeAny() - 批量提交,返回最先完成的
Integer anyResult = executor.invokeAny(tasks);

关闭线程池

// 优雅关闭
public void shutdownGracefully(ExecutorService executor) {
    // 1. 停止接受新任务
    executor.shutdown();
    
    try {
        // 2. 等待现有任务完成
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            // 3. 超时后强制关闭
            executor.shutdownNow();
            
            // 4. 再次等待
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("线程池未正常关闭");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

线程池监控

public void printThreadPoolStatus(ThreadPoolExecutor executor) {
    System.out.println("线程池状态:");
    System.out.println("  核心线程数: " + executor.getCorePoolSize());
    System.out.println("  最大线程数: " + executor.getMaximumPoolSize());
    System.out.println("  当前线程数: " + executor.getPoolSize());
    System.out.println("  活跃线程数: " + executor.getActiveCount());
    System.out.println("  任务队列大小: " + executor.getQueue().size());
    System.out.println("  已完成任务数: " + executor.getCompletedTaskCount());
    System.out.println("  总任务数: " + executor.getTaskCount());
}

最佳实践

1. 合理设置线程池大小

// CPU 密集型:核心数 + 1
int cpuCorePoolSize = Runtime.getRuntime().availableProcessors() + 1;

// IO 密集型:核心数 * 2 或根据公式计算
int ioCorePoolSize = Runtime.getRuntime().availableProcessors() * 2;

// 更精确的计算
// 线程数 = CPU核心数 / (1 - 阻塞系数)  阻塞系数0.8~0.9

2. 使用有界队列

// 推荐:使用 ArrayBlockingQueue 或指定容量的 LinkedBlockingQueue
new LinkedBlockingQueue<>(1000)  // 明确指定容量

3. 自定义线程工厂

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
    .setNameFormat("custom-pool-%d")
    .setDaemon(false)
    .setPriority(Thread.NORM_PRIORITY)
    .build();

4. 包装 Runnable/Callable

// 包装任务以记录执行时间
public class TimingTask implements Runnable {
    private final Runnable task;
    
    @Override
    public void run() {
        long start = System.currentTimeMillis();
        try {
            task.run();
        } finally {
            long duration = System.currentTimeMillis() - start;
            System.out.println("任务执行时间: " + duration + "ms");
        }
    }
}

5. 使用 CompletableFuture(Java 8+)

// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture.supplyAsync(() -> fetchData(), executor)
    .thenApply(data -> processData(data))
    .thenAccept(result -> saveResult(result));

常见问题

1. 线程池死锁

// 错误示例:在任务内提交并等待新任务
executor.submit(() -> {
    Future<Integer> future = executor.submit(() -> 1);
    return future.get(); // 可能导致死锁
});

2. 异常处理

// 使用包装类捕获异常
executor.execute(() -> {
    try {
        // 业务逻辑
    } catch (Exception e) {
        // 记录日志
        log.error("任务执行异常", e);
    }
});

总结

线程池是 Java 并发编程的核心工具,正确配置和使用线程池可以显著提升系统性能。记住:

  1. 使用 ThreadPoolExecutor 手动创建线程池
  2. 使用有界队列防止 OOM
  3. 设置合理的拒绝策略
  4. 正确关闭线程池
  5. 监控线程池状态