Latch 设计模式
Latch(阀门),该模式指定了一个屏障,只有所有的条件都达到满足的时候,门阀才能打开。
首先定义一个无限等待的抽象类 Latch ,在 Latch 抽象类中定义了 await() 方法、 countDown() 方法以及 getUnarrived() 方法,这些方法的用途在代码注释中都有详细介绍,当然在 Latch 中的 limit 属性至关重要,当 limit 降低到 0 时门阀将会被打开。
public abstract class Latch {
//用于控制多少个线程完成任务时才能打开阀门
protected int limit;
//通过构造函数传入limit
public Latch(int limit) {
this.limit = limit;
}
//该方法会使得当前线程一直等待,直到所有的线程都完成工作,被阻塞的线程是允许被中断的
public abstract void await()
throws InterruptedException;
//当任务线程完成工作之后调用该方法使得计数器减一
public abstract void countDown();
//获取当前还有多少个线程没有完成任务
public abstract int getUnarrived();
}
当完成的子任务数量达到 limit 的时候,门阀才能打开, await() 方法用于等待所有的子任务完成,如果到达数量未达到 limit 的时候,将会无限等待下去,当子任务完成的时候调用 countDown() 方法使计数器减少一个,表明我已经完成任务了, getUnarrived() 方法主要用于查询当前有多少个子任务还未结束。
无限等待的 CountDownLatch 实现
CountDownLatch 实现
public class CountDownLatch extends Latch {
public CountDownLatch(int limit) {
super(limit);
}
@Override
public void await()
throws InterruptedException {
synchronized (this) {
//当limit>0时,当前线程进入阻塞状态
while (limit > 0) {
this.wait();
}
}
}
@Override
public void countDown() {
synchronized (this) {
if (limit <= 0)
throw new IllegalStateException("all of task already arrived");
//使limit减一,并且通知阻塞线程
limit--;
this.notifyAll();
}
}
@Override
public int getUnarrived() {
//返回有多少线程还未完成任务
return limit;
}
}
当 limit > 0 时调用 await() 方法的线程将会进入无限的等待。 await() 方法不断判断 limit 的数量,大于 0 时门阀将不能打开,需要持续等待直到 limit 数量为 0 为止; countDown() 方法调用之后会导致 limit-- 操作,并且通知 waiting 的线程再次判断 limit 的值是否等于 0 ,当 limit 被减少到了 0 以下,则抛出状态非法的异常;
getUnarrived()获取当前还有多少个子任务未完成,这个返回值并不一定就是准确的,在多线程的情况下,某个线程在获得 Unarrived 任务数量并且返回之后,有可能limit又被减少,因此getUnarrived()是一个评估值。
测试
public class ProgrammerTravel extends Thread {
//门阀
private final Latch latch;
//程序员
private final String programmer;
//交通工具
private final String transportation;
//通过构造函数传入latch,programmer,transportation
public ProgrammerTravel(Latch latch, String programmer, String transportation) {
this.latch = latch;
this.programmer = programmer;
this.transportation = transportation;
}
@Override
public void run() {
System.out.println(programmer + " start take the transportation [" + transportation + "]");
try {
// 程序员乘坐交通工具花费在路上的时间(使用随机数字模拟)
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(programmer + " arrived by " + transportation);
//完成任务时使计数器减一
latch.countDown();
}
public static void main(String[] args) throws InterruptedException {
Latch latch = new CountDownLatch(4);
new ProgrammerTravel(latch, "Alex", "Bus").start();
new ProgrammerTravel(latch, "Gavin", "Walking").start();
new ProgrammerTravel(latch, "Jack", "Subway").start();
new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
//当前线程(main线程会进入阻塞,直到四个程序员全部都到达目的地)
latch.await();
System.out.println("== all of programmer arrived ==");
}
}
执行 latch.await() 的方法会进入阻塞,知道前面四个子线程都执行过 countdown() 方法之后才能往下继续执行,否则会无限期等待下去。
超时机制
当子线程存在超时执行的可能性是,我们需要为 Latch 设置超时机制。
超时机制实现
Latch 抽象类中增加带时间参数的 await(TimeUnit unit,long time) 方法;
public abstract class Latch {
...
public abstract void await(TimeUnit unit, long time)
throws InterruptedException, WaitTimeoutException;
}
CountDownLatch 中实现超时接口;
@Override
public void await(TimeUnit unit, long time)
throws InterruptedException, WaitTimeoutException {
if (time <= 0)
throw new IllegalArgumentException("The time is invalid.");
long remainingNanos = unit.toNanos(time); //将time转换为纳秒
//等待任务将在endNanos纳秒后超时
final long endNanos = System.nanoTime() + remainingNanos;
synchronized (this) {
while (limit > 0) {
//如果超时则抛出WaitTimeoutException异常
if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0)
throw new WaitTimeoutException("The wait time over specify time.");
//等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
remainingNanos = endNanos - System.nanoTime();
}
}
}
超时机制测试
public static void main(String[] args)
throws InterruptedException {
Latch latch = new CountDownLatch(4);
new ProgrammerTravel(latch, "Alex", "Bus").start();
new ProgrammerTravel(latch, "Gavin", "Walking").start();
new ProgrammerTravel(latch, "Jack", "Subway").start();
new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
try {
latch.await(TimeUnit.SECONDS, 5);
System.out.println("== all of programmer arrived ==");
} catch (WaitTimeoutException e) {
e.printStackTrace();
}
}
程序等待 5s 后,子任务未完成, CountdownLatch 抛出 WaitTimeoutException 异常通知主线程;
小结
Latch 设计模式提供了前置任务完成后再进行后面工作的设计方法,可以用在程序的缓存加载上,提高效率的同时,保证缓存加载完成之前任务阻塞,防止异常。自从 JDK 1.5 起也提供了 CountdownLatch 工具类,其实现是基于阻塞队列,但暴露的接口和上述相同。另外, CountdownLatch 只提供了门阀的阻塞功能,并不负责线程的管理,比如当子任务超时后, Latch 不会让超时的子任务停止,而是需要程序员自己控制任务的关闭。