Skip to content

自定义线程池

线程池的关键参数

创建 ThreadPoolExecutor 时,你需要注意以下几个关键参数:

  • 核心线程数 (corePoolSize):即使空闲,线程池中始终存活的线程数。
  • 最大线程数 (maximumPoolSize):线程池中允许的最大线程数。
  • 空闲线程存活时间 (keepAliveTime):非核心空闲线程在终止前等待新任务的最长时间。
  • 时间单位 (TimeUnit)keepAliveTime 的时间单位。
  • 工作队列 (BlockingQueue<Runnable>):存放待执行任务的队列。
  • 线程工厂 (ThreadFactory):用于创建新线程的工厂。
  • 拒绝策略 (RejectedExecutionHandler):当任务无法提交时的拒绝处理策略。

示例代码:创建线程池并执行任务

java
import java.util.concurrent.*;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                60, // 空闲线程存活时间
                TimeUnit.SECONDS, // 时间单位
                new ArrayBlockingQueue<>(10), // 工作队列
                Executors.defaultThreadFactory(), // 线程工厂
                new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略

        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskID = i;
            threadPool.execute(() -> {
                System.out.println("执行任务:" + taskID + ",线程名:" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        threadPool.shutdown(); // 关闭线程池
    }
}
java
package com.jasper.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), ((r, executor) ->
        {
            System.out.println(r.toString()+"is rejected");
            BlockingQueue<Runnable> queue = executor.getQueue();
            try {
                queue.offer(r, 10, TimeUnit.SECONDS);
                System.out.println("add queue success");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        )
        );
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println("thread is running !!!");
            });
        }
    }
}

线程池的关闭

  • shutdown():不会立即终止线程池,而是不再接受新任务,当所有任务完成后关闭线程池。
  • shutdownNow():试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

注意事项

  • 使用合适的线程池类型和参数设置可以显著提升系统性能。
  • 需要合理配置核心线程数和最大线程数,避免资源浪费或过载。
  • 使用适当的工作队列和拒绝策略,以应对系统负载高峰。
  • 注意线程池的关闭,确保应用退出前线程池资源被正确释放。

ThreadFactory

ThreadFactory 是 Java 并发API中的一个接口,位于 java.util.concurrent 包中。它的设计目的是由需求创建新线程时提供一个自定义的线程创建机制。通过实现 ThreadFactory 接口,开发者可以自定义线程的创建过程,如设置线程名、优先级、是否为守护线程等属性。

  • 目的和用途

ThreadFactory 主要用于那些需要自定义线程属性的场景,比如在使用线程池(如 ThreadPoolExecutor)时,通过自定义的 ThreadFactory 可以控制线程池中线程的配置。这样做可以提高程序的可读性和可管理性,同时也方便调试。

  • 实现 ThreadFactory

实现 ThreadFactory 非常简单,只需实现 newThread(Runnable r) 方法,该方法接收一个 Runnable 实例作为参数,并返回一个新的、按照需求配置好的 Thread 实例。

java
import java.util.concurrent.ThreadFactory;

public class SimpleThreadFactory implements ThreadFactory {
    private int threadId = 1;
    private String namePrefix;

    public SimpleThreadFactory(String name) {
        this.namePrefix = "自定义线程工厂-" + name + "-线程-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + threadId);
        System.out.println("创建了新线程: " + t.getName());
        threadId++;
        return t;
    }
}
  • ThreadPoolExecutor 结合使用

自定义的 ThreadFactory 可以与 ThreadPoolExecutor 结合使用,为线程池定制线程:

java
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecutorExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new SimpleThreadFactory("我的池"));
        executor.execute(() -> {
            System.out.println("任务正在线程中运行: " + Thread.currentThread().getName());
        });
        executor.shutdown();
    }
}
  • 优点

  • 定制化:允许设置自定义名称、优先级等线程属性,有助于调试和管理线程。

  • 灵活性:不同的应用部分或不同的应用程序可以根据其特定需求使用不同的 ThreadFactory 实现。

  • 一致性:确保为特定组件或应用程序创建的所有线程具有一致的属性。

在处理 Java 并发时,特别是在复杂应用程序或系统中,使用 ThreadFactory 是一种最佳实践,可以有效地管理线程属性,提高性能和可维护性。

RejectedExecutionHandler

RejectedExecutionHandler 是 Java 并发框架中的一个接口,用于处理当任务被线程池拒绝添加时的情况。当线程池已经达到其界限,不能接受额外的任务时,就会发生任务拒绝。ThreadPoolExecutor 类提供了四种预定义的拒绝策略,这些策略定义了不同的处理被拒绝任务的方式:

  1. ThreadPoolExecutor.AbortPolicy 这是默认的拒绝策略。当任务被拒绝时,AbortPolicy 策略会抛出一个运行时异常 RejectedExecutionException,直接拒绝新任务的执行。这种策略直接反馈执行失败,使得你可以立即知道哪些任务没有被执行。

  2. ThreadPoolExecutor.CallerRunsPolicy 这种策略不会抛出异常。相反,它会尝试在执行任务的当前线程中直接运行被拒绝的任务。这意味着如果任务被拒绝,将由提交任务的线程(即调用者线程)来直接执行该任务。这种策略提供了一种减轻服务器压力的简单反馈机制,因为它将额外的任务负担放在了提交任务的线程上。

  3. ThreadPoolExecutor.DiscardPolicy 这种策略将静默地忽略被拒绝的任务,不做任何处理。也就是说,如果任务被拒绝,那么这个任务将被丢弃且不会有任何警告或异常。这可能会导致某些任务无法执行,而你却没有任何通知,因此使用时需要谨慎。

  4. ThreadPoolExecutor.DiscardOldestPolicy 这种策略将尝试丢弃最早的未处理请求,然后重试执行程序(也就是尝试再次提交被拒绝的任务)。这种方式旨在为新任务释放空间,假定旧任务可以被放弃。但如果线程池持续饱和,这也可能导致一些任务被丢弃。

  • 选择合适的策略 选择哪种拒绝策略取决于具体应用场景和需求。默认的 AbortPolicy 策略通过抛出异常直接反馈问题,而 CallerRunsPolicy 通过调用者运行任务提供了一种减缓压力的方式。DiscardPolicyDiscardOldestPolicy 可能会导致任务丢失,因此在使用时需要更加谨慎,确保这种丢失不会影响应用程序的正确性。

在实践中,你可能会根据需要实现自定义的 RejectedExecutionHandler 策略,以提供更符合特定应用需求的任务拒绝处理逻辑。