Skip to content

Phaser

Phaser(阶段同步器)是 Java 并发包 java.util.concurrent 中提供的一个同步辅助类,它用于协调多个线程间的复杂多阶段任务的执行。Phaser 是在 Java 7 中引入的,其功能类似于 CyclicBarrierCountDownLatch,但提供了更灵活的控制和动态的线程参与管理。

主要特性

  • 多阶段执行Phaser 支持多阶段计算的执行,每个阶段结束时可以进行同步。
  • 动态线程管理:新的参与者可以在任何阶段注册到 Phaser 中,也可以随时撤销注册,这使得线程的动态加入和退出成为可能。
  • 可重用:与 CyclicBarrier 类似,Phaser 在达到一定的同步点后可以重复使用,支持为接下来的阶段重新配置。

核心方法

  • register():注册一个未参与的线程,使其能够参与 Phaser 同步。
  • arriveAndAwaitAdvance():当前线程到达,并等待其他线程到达同步点。
  • arriveAndDeregister():表示当前线程已完成所有阶段的执行,并从 Phaser 中注销,不再参与后续同步。
  • bulkRegister(int parties):一次性注册多个参与者。
graph LR;
    A[开始] --> B[线程注册]
    B --> C[第一阶段]
    C -->|等待其他线程| D[达到屏障点]
    D --> E[第二阶段]
    E -->|等待其他线程| F[达到屏障点]
    F --> G[第三阶段]
    G -->|等待其他线程| H[达到屏障点]
    H --> I[线程完成并注销]
    I --> J[结束]

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style J fill:#ccf,stroke:#333,stroke-width:2px
    style D fill:#ff9,stroke:#333,stroke-width:2px
    style F fill:#ff9,stroke:#333,stroke-width:2px
    style H fill:#ff9,stroke:#333,stroke-width:2px

示例代码

以下示例演示如何使用 Phaser 来协调一组线程的多阶段任务执行。假设我们有一个需要多个线程协作执行多个任务阶段的场景。

java
import java.util.concurrent.Phaser;

public class PhaserExample {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(1); // 注册主线程,初始有1个参与者

        System.out.println("启动任务");

        for (int i = 1; i <= 5; i++) { // 创建5个任务
            final int taskNo = i;
            phaser.register(); // 为每个任务注册一个参与者
            new Thread(() -> {
                System.out.println("任务" + taskNo + " 第一阶段执行");
                phaser.arriveAndAwaitAdvance(); // 完成第一阶段并等待

                System.out.println("任务" + taskNo + " 第二阶段执行");
                phaser.arriveAndAwaitAdvance(); // 完成第二阶段并等待

                System.out.println("任务" + taskNo + " 完成");
                phaser.arriveAndDeregister(); // 任务完成,注销
            }).start();
        }

        // 等待所有任务完成第一阶段
        phaser.arriveAndAwaitAdvance();
        System.out.println("所有任务完成第一阶段");

        // 等待所有任务完成第二阶段
        phaser.arriveAndAwaitAdvance();
        System.out.println("所有任务完成第二阶段");

        // 主线程注销,任务全部完成
        phaser.arriveAndDeregister();
        System.out.println("所有任务完成");
    }
}
java
import java.util.concurrent.Phaser;

public class DataProcessingPhaser {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(1); // 注册主线程

        System.out.println("数据处理开始");

        for (int i = 1; i <= 10; i++) { // 假设有10个数据处理任务
            int finalI = i;
            phaser.register(); // 为每个任务注册一个参与者
            new Thread(() -> {
                System.out.println("数据预处理 " + finalI);
                phaser.arriveAndAwaitAdvance(); // 完成预处理阶段并等待

                System.out.println("数据分析 " + finalI);
                phaser.arriveAndAwaitAdvance(); // 完成分析阶段并等待

                System.out.println("数据汇总 " + finalI);
                phaser.arriveAndDeregister(); // 完成汇总,注销
            }).start();
        }

        // 主线程在每个阶段也必须到达并等待
        phaser.arriveAndAwaitAdvance(); // 等待预处理阶段完成
        System.out.println("预处理阶段完成");

        phaser.arriveAndAwaitAdvance(); // 等待分析阶段完成
        System.out.println("分析阶段完成");

        // 所有任务都已完成最后的汇总阶段
        phaser.arriveAndDeregister(); // 主线程注销
        System.out.println("所有数据处理完成");
    }
}

使用注意事项

  1. 正确注册和注销:确保每个参与线程正确注册并在完成后注销。如果忘记注销,Phaser 可能无法正常前进到下一个阶段。
  2. 管理阶段PhasergetPhase() 方法可以获取当前阶段,这对调试和监控是有帮助的。
  3. 动态参与Phaser 支持动态的线程参与,允许在任何阶段增加或减少线程的数量,这为编写动态的并行程序提供了便利。

Phaser 提供的灵活性使其成为处理多阶段任务时的理想选择,尤其是当任务数量或参与线程数可能动态变化时。这种特性使得 Phaser 成为并

发编程中一个非常有用的工具。