Executor
为什么要有线程池
- 降低资源损耗
- 提高响应速度
- 提高线程的可管理性
Java 线程池是通过 Executor
框架实现的,主要包括 Executor
接口及其实现类, 如 ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
,以及工具类 Executors
。 线程池的使用可以减少在执行大量异步任务时创建和销毁线程的开销,提高系统的响应速度和运行效率。
核心组成
Executor
接口:定义了执行提交的Runnable
任务的方法。ExecutorService
接口:是Executor
的子接口,添加了生命周期管理的方法,如shutdown()
。ScheduledExecutorService
接口:扩展了ExecutorService
,支持定时及周期性任务执行。ThreadPoolExecutor
类:ExecutorService
的实现,提供了创建各种线程池的功能。ScheduledThreadPoolExecutor
类:继承ThreadPoolExecutor
类,并实现了ScheduledExecutorService
接口,支持定时及周期性任务执行。Executors
类:工具类,提供静态方法快速创建不同类型的线程池。
线程池的关键参数
创建 ThreadPoolExecutor
时,你需要注意以下几个关键参数:
- 核心线程数 (
corePoolSize
):即使空闲,线程池中始终存活的线程数。 - 最大线程数 (
maximumPoolSize
):线程池中允许的最大线程数。 - 空闲线程存活时间 (
keepAliveTime
):非核心空闲线程在终止前等待新任务的最长时间。 - 时间单位 (
TimeUnit
):keepAliveTime
的时间单位。 - 工作队列 (
BlockingQueue<Runnable>
):存放待执行任务的队列。 - 线程工厂 (
ThreadFactory
):用于创建新线程的工厂。 - 拒绝策略 (
RejectedExecutionHandler
):当任务无法提交时的拒绝处理策略。
示例代码:创建线程池并执行任务
import java.util.concurrent.*;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskID = i;
threadPool.execute(() -> {
System.out.println("执行任务:" + taskID + ",线程名:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown(); // 关闭线程池
}
}
package com.jasper.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), ((r, executor) ->
{
System.out.println(r.toString()+"is rejected");
BlockingQueue<Runnable> queue = executor.getQueue();
try {
queue.offer(r, 10, TimeUnit.SECONDS);
System.out.println("add queue success");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
)
);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
System.out.println("thread is running !!!");
});
}
}
}
线程池的关闭
shutdown()
:不会立即终止线程池,而是不再接受新任务,当所有任务完成后关闭线程池。shutdownNow()
:试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
注意事项
- 使用合适的线程池类型和参数设置可以显著提升系统性能。
- 需要合理配置核心线程数和最大线程数,避免资源浪费或过载。
- 使用适当的工作队列和拒绝策略,以应对系统负载高峰。
- 注意线程池的关闭,确保应用退出前线程池资源被正确释放。
ThreadFactory
ThreadFactory
是 Java 并发API中的一个接口,位于 java.util.concurrent
包中。它的设计目的是由需求创建新线程时提供一个自定义的线程创建机制。通过实现 ThreadFactory
接口,开发者可以自定义线程的创建过程,如设置线程名、优先级、是否为守护线程等属性。
- 目的和用途
ThreadFactory
主要用于那些需要自定义线程属性的场景,比如在使用线程池(如 ThreadPoolExecutor
)时,通过自定义的 ThreadFactory
可以控制线程池中线程的配置。这样做可以提高程序的可读性和可管理性,同时也方便调试。
- 实现
ThreadFactory
实现 ThreadFactory
非常简单,只需实现 newThread(Runnable r)
方法,该方法接收一个 Runnable
实例作为参数,并返回一个新的、按照需求配置好的 Thread
实例。
import java.util.concurrent.ThreadFactory;
public class SimpleThreadFactory implements ThreadFactory {
private int threadId = 1;
private String namePrefix;
public SimpleThreadFactory(String name) {
this.namePrefix = "自定义线程工厂-" + name + "-线程-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadId);
System.out.println("创建了新线程: " + t.getName());
threadId++;
return t;
}
}
- 与
ThreadPoolExecutor
结合使用
自定义的 ThreadFactory
可以与 ThreadPoolExecutor
结合使用,为线程池定制线程:
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class ExecutorExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new SimpleThreadFactory("我的池"));
executor.execute(() -> {
System.out.println("任务正在线程中运行: " + Thread.currentThread().getName());
});
executor.shutdown();
}
}
优点
定制化:允许设置自定义名称、优先级等线程属性,有助于调试和管理线程。
灵活性:不同的应用部分或不同的应用程序可以根据其特定需求使用不同的
ThreadFactory
实现。一致性:确保为特定组件或应用程序创建的所有线程具有一致的属性。
在处理 Java 并发时,特别是在复杂应用程序或系统中,使用 ThreadFactory
是一种最佳实践,可以有效地管理线程属性,提高性能和可维护性。
RejectedExecutionHandler
RejectedExecutionHandler
是 Java 并发框架中的一个接口,用于处理当任务被线程池拒绝添加时的情况。当线程池已经达到其界限,不能接受额外的任务时,就会发生任务拒绝。ThreadPoolExecutor
类提供了四种预定义的拒绝策略,这些策略定义了不同的处理被拒绝任务的方式:
ThreadPoolExecutor.AbortPolicy
这是默认的拒绝策略。当任务被拒绝时,AbortPolicy
策略会抛出一个运行时异常RejectedExecutionException
,直接拒绝新任务的执行。这种策略直接反馈执行失败,使得你可以立即知道哪些任务没有被执行。ThreadPoolExecutor.CallerRunsPolicy
这种策略不会抛出异常。相反,它会尝试在执行任务的当前线程中直接运行被拒绝的任务。这意味着如果任务被拒绝,将由提交任务的线程(即调用者线程)来直接执行该任务。这种策略提供了一种减轻服务器压力的简单反馈机制,因为它将额外的任务负担放在了提交任务的线程上。ThreadPoolExecutor.DiscardPolicy
这种策略将静默地忽略被拒绝的任务,不做任何处理。也就是说,如果任务被拒绝,那么这个任务将被丢弃且不会有任何警告或异常。这可能会导致某些任务无法执行,而你却没有任何通知,因此使用时需要谨慎。ThreadPoolExecutor.DiscardOldestPolicy
这种策略将尝试丢弃最早的未处理请求,然后重试执行程序(也就是尝试再次提交被拒绝的任务)。这种方式旨在为新任务释放空间,假定旧任务可以被放弃。但如果线程池持续饱和,这也可能导致一些任务被丢弃。
- 选择合适的策略 选择哪种拒绝策略取决于具体应用场景和需求。默认的
AbortPolicy
策略通过抛出异常直接反馈问题,而CallerRunsPolicy
通过调用者运行任务提供了一种减缓压力的方式。DiscardPolicy
和DiscardOldestPolicy
可能会导致任务丢失,因此在使用时需要更加谨慎,确保这种丢失不会影响应用程序的正确性。
在实践中,你可能会根据需要实现自定义的 RejectedExecutionHandler
策略,以提供更符合特定应用需求的任务拒绝处理逻辑。
Executors
Executors
是 Java 并发包中的一个工具类,提供了若干静态方法来创建不同类型的线程池。这些线程池对于执行大量异步任务提供了高效的管理方式。下面,我们将详细介绍 Executors
类提供的几种常用线程池,以及它们的使用场景和代码示例。
固定大小的线程池(Fixed Thread Pool)
- 使用场景:适用于负载较重的服务器。
- 创建方式:
Executors.newFixedThreadPool(int nThreads)
。 - 特点:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(() -> {
// 执行任务
});
}
fixedThreadPool.shutdown();
缓存线程池(Cached Thread Pool)
- 使用场景:适用于执行很多短期异步任务的程序,或者是负载较轻的服务器。
- 创建方式:
Executors.newCachedThreadPool()
。 - 特点:创建一个可根据需要创建新线程的线程池,但是在先前构建的线程可用时将重用它们。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.execute(() -> {
// 执行任务
});
}
cachedThreadPool.shutdown();
单线程化的线程池(Single Thread Executor)
- 使用场景:适用于需要保证顺序执行各个任务的场景,并且在任意时间点,不会有多个线程是活动的。
- 创建方式:
Executors.newSingleThreadExecutor()
。 - 特点:创建一个单线程的执行器。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singleThreadExecutor.execute(() -> {
// 执行任务
});
}
singleThreadExecutor.shutdown();
定时及周期性任务执行的线程池(Scheduled Thread Pool)
- 使用场景:适用于需要多个后台线程执行周期任务,同时作为定时任务的调度器。
- 创建方式:
Executors.newScheduledThreadPool(int corePoolSize)
。 - 特点:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(() -> {
// 执行任务
}, 5, TimeUnit.SECONDS);
scheduledThreadPool.shutdown();
注意事项
虽然 Executors
提供了便捷的方式来快速创建线程池,但在生产环境中,直接使用这些方法创建的线程池可能并不合适。比如,newCachedThreadPool
和 newScheduledThreadPool
默认创建的都是具有 Integer.MAX_VALUE 的队列,可能会导致 OOM(内存溢出)。newFixedThreadPool
和 newSingleThreadExecutor
默认的队列是无界的,也可能会导致 OOM。因此,在实际应用中,根据需要创建具有合理配置的 ThreadPoolExecutor
实例通常是更好的选择。
在 Java 并发编程中,当我们通过 Executors
类的 newFixedThreadPool(int nThreads)
和 newSingleThreadExecutor()
方法创建线程池时, 默认使用的工作队列是无界的。这里所说的“无界的”指的是队列的容量没有固定的上限,理论上可以无限制地添加任务。 默认情况下,这两种类型的线程池使用的是 LinkedBlockingQueue
作为其工作队列。
LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表结构的阻塞队列,除非系统资源耗尽,否则它可以无限制地接受任务。这意味着,如果任务提交速度持续超过线程池处理速度,队列会持续增长,最终可能导致内存溢出(OOM)。
示例
下面是一个简单的例子,展示了如何使用 newFixedThreadPool
创建一个固定大小的线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10000; i++) {
final int taskID = i;
fixedThreadPool.execute(() -> {
System.out.println("执行任务: " + taskID);
});
}
在这个例子中,如果这 10000 个任务被迅速提交到线程池,而线程池内部处理这些任务的速度跟不上提交的速度, 那么这些待处理的任务就会被存放在 LinkedBlockingQueue
中,队列会持续增长。
风险与解决方案
- 风险:在高负载的场景下,使用无界队列可能会导致高内存消耗,甚至
OutOfMemoryError
。 - 解决方案:
- 使用有界队列,如
ArrayBlockingQueue
,在创建ThreadPoolExecutor
时指定容量上限。 - 根据实际场景合理配置线程池的核心线程数和最大线程数,以及队列大小,避免任务积压。
- 对于超出队列容量的任务,可以采取一定的拒绝策略(如
ThreadPoolExecutor.CallerRunsPolicy
),避免无限制地提交任务。
- 使用有界队列,如
通过合理配置和管理线程池,可以有效避免因任务积压导致的内存溢出风险,确保应用的稳定性和性能。
在 Java 的 Executors
工具类中,newCachedThreadPool()
和 newScheduledThreadPool(int corePoolSize)
方法创建的线程池行为与队列大小有着直接的关系,尤其是在任务提交速度远大于处理速度的场景下。这两种类型的线程池在处理任务时使用的队列和任务调度方式有所不同,它们对内存的潜在影响也各有特点。
newCachedThreadPool
- 队列特性:
newCachedThreadPool()
方法创建的线程池实际上使用的是SynchronousQueue
。SynchronousQueue
并不是一个真正的队列,因为它不会持有元素。每个put
操作必须等待一个take
操作,反之亦然。所以,newCachedThreadPool
的队列实际上并不存储任务。 - OOM 风险:尽管
newCachedThreadPool
使用的是SynchronousQueue
,OOM 风险并不是由队列无界引起的,而是因为这种类型的线程池允许创建的线程数量理论上到达Integer.MAX_VALUE
,如果大量短期异步任务不断提交,而每个任务都创建新线线程执行,就可能导致创建过多线程,从而耗尽系统资源。
newScheduledThreadPool
- 队列特性:
newScheduledThreadPool
方法创建的线程池内部使用的是DelayedWorkQueue
,专门用于处理延迟任务或定时任务。这种队列是无界的,能够容纳Integer.MAX_VALUE
个元素。 - OOM 风险:由于
DelayedWorkQueue
是无界的,如果大量的延迟任务或定时任务被提交到线程池而未得到及时执行,它们会在队列中累积,最终可能导致内存溢出。
示例代码
// 创建一个缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 创建一个定时任务线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
// 示例:向缓存线程池提交大量短期异步任务
for (int i = 0; i < 10000; i++) {
cachedThreadPool.execute(() -> {
// 执行任务
});
}
// 示例:向定时任务线程池提交大量定时任务
for (int i = 0; i < 10000; i++) {
scheduledThreadPool.schedule(() -> {
// 执行任务
}, 10, TimeUnit.SECONDS);
}
解决方案
为了避免因大量任务提交到这些线程池而导致的潜在内存溢出问题,可以采用以下措施:
- 对于
newCachedThreadPool
,避免提交大量短期异步任务,或者使用带有线程数量限制的线程池,如newFixedThreadPool
。 - 对于
newScheduledThreadPool
,确保提交的定时任务能够在合理的时间内执行完成,避免任务在队列中长时间积压。 - 直接使用
ThreadPoolExecutor
构造函数创建线程池,明确指定线程池参数,包括核心线程数、最大线程数、存活时间、工作队列等,以及合理的拒绝策略,从而细粒度地控制线程池的行为。
通过这些措施,可以有效管理线程池资源,防止因任务积压导致的内存溢出问题。