Executor
为什么要有线程池
- 降低资源损耗
- 提高响应速度
- 提高线程的可管理性
Java 线程池是通过 Executor
框架实现的,主要包括 Executor
接口及其实现类, 如 ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
,以及工具类 Executors
。 线程池的使用可以减少在执行大量异步任务时创建和销毁线程的开销,提高系统的响应速度和运行效率。
核心组成
Executor
接口:定义了执行提交的Runnable
任务的方法。ExecutorService
接口:是Executor
的子接口,添加了生命周期管理的方法,如shutdown()
。ScheduledExecutorService
接口:扩展了ExecutorService
,支持定时及周期性任务执行。ThreadPoolExecutor
类:ExecutorService
的实现,提供了创建各种线程池的功能。ScheduledThreadPoolExecutor
类:继承ThreadPoolExecutor
类,并实现了ScheduledExecutorService
接口,支持定时及周期性任务执行。Executors
类:工具类,提供静态方法快速创建不同类型的线程池。
Executors
Executors
是 Java 并发包中的一个工具类,提供了若干静态方法来创建不同类型的线程池。这些线程池对于执行大量异步任务提供了高效的管理方式。下面,我们将详细介绍 Executors
类提供的几种常用线程池,以及它们的使用场景和代码示例。
固定大小的线程池(Fixed Thread Pool)
- 使用场景:适用于负载较重的服务器。
- 创建方式:
Executors.newFixedThreadPool(int nThreads)
。 - 特点:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(() -> {
// 执行任务
});
}
fixedThreadPool.shutdown();
缓存线程池(Cached Thread Pool)
- 使用场景:适用于执行很多短期异步任务的程序,或者是负载较轻的服务器。
- 创建方式:
Executors.newCachedThreadPool()
。 - 特点:创建一个可根据需要创建新线程的线程池,但是在先前构建的线程可用时将重用它们。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.execute(() -> {
// 执行任务
});
}
cachedThreadPool.shutdown();
单线程化的线程池(Single Thread Executor)
- 使用场景:适用于需要保证顺序执行各个任务的场景,并且在任意时间点,不会有多个线程是活动的。
- 创建方式:
Executors.newSingleThreadExecutor()
。 - 特点:创建一个单线程的执行器。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singleThreadExecutor.execute(() -> {
// 执行任务
});
}
singleThreadExecutor.shutdown();
定时及周期性任务执行的线程池(Scheduled Thread Pool)
- 使用场景:适用于需要多个后台线程执行周期任务,同时作为定时任务的调度器。
- 创建方式:
Executors.newScheduledThreadPool(int corePoolSize)
。 - 特点:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(() -> {
// 执行任务
}, 5, TimeUnit.SECONDS);
scheduledThreadPool.shutdown();
注意事项
虽然 Executors
提供了便捷的方式来快速创建线程池,但在生产环境中,直接使用这些方法创建的线程池可能并不合适。比如,newCachedThreadPool
和 newScheduledThreadPool
默认创建的都是具有 Integer.MAX_VALUE 的队列,可能会导致 OOM(内存溢出)。newFixedThreadPool
和 newSingleThreadExecutor
默认的队列是无界的,也可能会导致 OOM。因此,在实际应用中,根据需要创建具有合理配置的 ThreadPoolExecutor
实例通常是更好的选择。
在 Java 并发编程中,当我们通过 Executors
类的 newFixedThreadPool(int nThreads)
和 newSingleThreadExecutor()
方法创建线程池时, 默认使用的工作队列是无界的。这里所说的“无界的”指的是队列的容量没有固定的上限,理论上可以无限制地添加任务。 默认情况下,这两种类型的线程池使用的是 LinkedBlockingQueue
作为其工作队列。
LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表结构的阻塞队列,除非系统资源耗尽,否则它可以无限制地接受任务。这意味着,如果任务提交速度持续超过线程池处理速度,队列会持续增长,最终可能导致内存溢出(OOM)。
示例
下面是一个简单的例子,展示了如何使用 newFixedThreadPool
创建一个固定大小的线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10000; i++) {
final int taskID = i;
fixedThreadPool.execute(() -> {
System.out.println("执行任务: " + taskID);
});
}
在这个例子中,如果这 10000 个任务被迅速提交到线程池,而线程池内部处理这些任务的速度跟不上提交的速度, 那么这些待处理的任务就会被存放在 LinkedBlockingQueue
中,队列会持续增长。
风险与解决方案
- 风险:在高负载的场景下,使用无界队列可能会导致高内存消耗,甚至
OutOfMemoryError
。 - 解决方案:
- 使用有界队列,如
ArrayBlockingQueue
,在创建ThreadPoolExecutor
时指定容量上限。 - 根据实际场景合理配置线程池的核心线程数和最大线程数,以及队列大小,避免任务积压。
- 对于超出队列容量的任务,可以采取一定的拒绝策略(如
ThreadPoolExecutor.CallerRunsPolicy
),避免无限制地提交任务。
- 使用有界队列,如
通过合理配置和管理线程池,可以有效避免因任务积压导致的内存溢出风险,确保应用的稳定性和性能。
在 Java 的 Executors
工具类中,newCachedThreadPool()
和 newScheduledThreadPool(int corePoolSize)
方法创建的线程池行为与队列大小有着直接的关系,尤其是在任务提交速度远大于处理速度的场景下。这两种类型的线程池在处理任务时使用的队列和任务调度方式有所不同,它们对内存的潜在影响也各有特点。
newCachedThreadPool
- 队列特性:
newCachedThreadPool()
方法创建的线程池实际上使用的是SynchronousQueue
。SynchronousQueue
并不是一个真正的队列,因为它不会持有元素。每个put
操作必须等待一个take
操作,反之亦然。所以,newCachedThreadPool
的队列实际上并不存储任务。 - OOM 风险:尽管
newCachedThreadPool
使用的是SynchronousQueue
,OOM 风险并不是由队列无界引起的,而是因为这种类型的线程池允许创建的线程数量理论上到达Integer.MAX_VALUE
,如果大量短期异步任务不断提交,而每个任务都创建新线线程执行,就可能导致创建过多线程,从而耗尽系统资源。
newScheduledThreadPool
- 队列特性:
newScheduledThreadPool
方法创建的线程池内部使用的是DelayedWorkQueue
,专门用于处理延迟任务或定时任务。这种队列是无界的,能够容纳Integer.MAX_VALUE
个元素。 - OOM 风险:由于
DelayedWorkQueue
是无界的,如果大量的延迟任务或定时任务被提交到线程池而未得到及时执行,它们会在队列中累积,最终可能导致内存溢出。
示例代码
// 创建一个缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 创建一个定时任务线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
// 示例:向缓存线程池提交大量短期异步任务
for (int i = 0; i < 10000; i++) {
cachedThreadPool.execute(() -> {
// 执行任务
});
}
// 示例:向定时任务线程池提交大量定时任务
for (int i = 0; i < 10000; i++) {
scheduledThreadPool.schedule(() -> {
// 执行任务
}, 10, TimeUnit.SECONDS);
}
解决方案
为了避免因大量任务提交到这些线程池而导致的潜在内存溢出问题,可以采用以下措施:
- 对于
newCachedThreadPool
,避免提交大量短期异步任务,或者使用带有线程数量限制的线程池,如newFixedThreadPool
。 - 对于
newScheduledThreadPool
,确保提交的定时任务能够在合理的时间内执行完成,避免任务在队列中长时间积压。 - 直接使用
ThreadPoolExecutor
构造函数创建线程池,明确指定线程池参数,包括核心线程数、最大线程数、存活时间、工作队列等,以及合理的拒绝策略,从而细粒度地控制线程池的行为。
通过这些措施,可以有效管理线程池资源,防止因任务积压导致的内存溢出问题。