CyclicBarrier
CyclicBarrier 是 Java 中 java.util.concurrent 包提供的一个同步辅助类,它允许一组线程互相等待,
直到所有线程都达到一个公共屏障点(Common Barrier Point)再继续执行。与 CountDownLatch 相比,
CyclicBarrier 的主要特点是它可以重用,即一旦所有等待线程都被释放,它可以被重置并再次使用。
核心特性
- 同步点:所有线程必须到达屏障点,屏障才会打开,线程才能继续执行。
- 重用性:一旦所有等待线程释放,
CyclicBarrier可以被重置并重新使用。 - 可选的屏障动作:可以在所有线程到达屏障后执行一个屏障动作,这是一个由最后一个到达的线程执行的 Runnable 任务。
使用场景
CyclicBarrier 适用于多线程计算数据,数据部分完成后,需要合并计算结果的场景。例如,多线程进行矩阵乘法计算,每个线程计算矩阵中的一部分,然后合并结果。
示例代码
下面是一个示例,展示了如何使用 CyclicBarrier 来协调多个线程的工作,其中线程完成部分任务后等待其他线程。
package com.jasper.tools.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author jasper
* @since 2026-05-25 12:23:45
*/
public class Task implements Runnable {
private String name;
private CyclicBarrier barrier;
public Task(String name, CyclicBarrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(name + " phase 1 is currently underway");
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println(name + " reach barrier");
barrier.await(); // 所有线程都到达barrier之后才继续执行 can reuse
System.out.println(name + " phase 2 is currently underway");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(name + " reach barrier");
barrier.await();
System.out.println(name + "end");
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} catch (BrokenBarrierException e) {
System.out.println(e.getMessage());
}
}
}
/**
* @author jasper
* @since 2026-05-25 12:23:39 <br>
* 允许一组线程互相等待, 直到所有线程都达到一个公共屏障点(Common Barrier Point)再继续执行 <br>
* 主要线程协作 例如4个线程各自计算一部分 最后汇总
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier barrier =
new CyclicBarrier(3, () -> System.out.println("all task done 1 phase"));
new Thread(new Task("A", barrier)).start();
new Thread(new Task("B", barrier)).start();
new Thread(new Task("C", barrier)).start();
}
}
import java.util.concurrent.CyclicBarrier;
public class MarketSimulation {
private static final int NUM_AGENTS = 3; // 假设有三个代理:买家、卖家和监管者
private static final CyclicBarrier barrier = new CyclicBarrier(NUM_AGENTS, () -> {
System.out.println("市场同步数据和策略更新...");
});
static class Agent implements Runnable {
private String role;
Agent(String role) {
this.role = role;
}
@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) { // 模拟五个交易日
System.out.println(role + " 完成第 " + i + " 个交易日的交易");
barrier.await(); // 等待其他代理完成
System.out.println(role + " 开始第 " + i + " 个交易日的后市场活动");
}
} catch (Exception e) {
System.out.println(role + " 遇到异常");
}
}
}
public static void main(String[] args) {
String[] roles = {"买家", "卖家", "监管者"};
for (String role : roles) {
new Thread(new Agent(role)).start();
}
}
}
注意事项
- 异常处理:如果任何参与线程在等待过程中被中断或超时,或者
barrier.await()调用时 barrier 被重置或损坏,则会抛出相应的异常。这可能包括InterruptedException,BrokenBarrierException或TimeoutException。 - 屏障破损:如果在任何线程在屏障处等待时,其中一个线程被中断,所有其他等待线程都将抛出
BrokenBarrierException,并且屏障被视为损坏。 - 重置:
CyclicBarrier提供一个reset()方法,可以重置屏障到其初始状态。注意,如果有线程在屏障处等待时调用reset(),这些线程将抛出BrokenBarrierException。
CyclicBarrier 是一种强大的同步辅助类,非常适合于将多个子任务分段执行的场景。正确使用时,可以有效地协调多线程之间的工作流程和数据整合。