Skip to content
Go back

Latch 设计模式

Edit page

Latch 设计模式

Latch(阀门),该模式指定了一个屏障,只有所有的条件都达到满足的时候,门阀才能打开。

首先定义一个无限等待的抽象类 Latch ,在 Latch 抽象类中定义了 await() 方法、 countDown() 方法以及 getUnarrived() 方法,这些方法的用途在代码注释中都有详细介绍,当然在 Latch 中的 limit 属性至关重要,当 limit 降低到 0 时门阀将会被打开。

public abstract class Latch {
    //用于控制多少个线程完成任务时才能打开阀门
    protected int limit;

    //通过构造函数传入limit
    public Latch(int limit) {
        this.limit = limit;
    }

    //该方法会使得当前线程一直等待,直到所有的线程都完成工作,被阻塞的线程是允许被中断的
    public abstract void await()
            throws InterruptedException;

    //当任务线程完成工作之后调用该方法使得计数器减一
    public abstract void countDown();

    //获取当前还有多少个线程没有完成任务
    public abstract int getUnarrived();
}

当完成的子任务数量达到 limit 的时候,门阀才能打开, await() 方法用于等待所有的子任务完成,如果到达数量未达到 limit 的时候,将会无限等待下去,当子任务完成的时候调用 countDown() 方法使计数器减少一个,表明我已经完成任务了, getUnarrived() 方法主要用于查询当前有多少个子任务还未结束。

无限等待的 CountDownLatch 实现

CountDownLatch 实现

public class CountDownLatch extends Latch {
    public CountDownLatch(int limit) {
        super(limit);
    }

    @Override
    public void await()
            throws InterruptedException {
        synchronized (this) {
            //当limit>0时,当前线程进入阻塞状态
            while (limit > 0) {
                this.wait();
            }
        }
    }

    @Override
    public void countDown() {
        synchronized (this) {
            if (limit <= 0)
                throw new IllegalStateException("all of task already arrived");
            //使limit减一,并且通知阻塞线程
            limit--;
            this.notifyAll();
        }
    }

    @Override
    public int getUnarrived() {
        //返回有多少线程还未完成任务
        return limit;
    }
}

limit > 0 时调用 await() 方法的线程将会进入无限的等待。 await() 方法不断判断 limit 的数量,大于 0 时门阀将不能打开,需要持续等待直到 limit 数量为 0 为止; countDown() 方法调用之后会导致 limit-- 操作,并且通知 waiting 的线程再次判断 limit 的值是否等于 0 ,当 limit 被减少到了 0 以下,则抛出状态非法的异常;

getUnarrived() 获取当前还有多少个子任务未完成,这个返回值并不一定就是准确的,在多线程的情况下,某个线程在获得 Unarrived 任务数量并且返回之后,有可能 limit 又被减少,因此 getUnarrived() 是一个评估值。

测试

public class ProgrammerTravel extends Thread {
    //门阀
    private final Latch latch;
    //程序员
    private final String programmer;
    //交通工具
    private final String transportation;

    //通过构造函数传入latch,programmer,transportation
    public ProgrammerTravel(Latch latch, String programmer, String transportation) {
        this.latch = latch;
        this.programmer = programmer;
        this.transportation = transportation;
    }

    @Override
    public void run() {
        System.out.println(programmer + " start take the transportation [" + transportation + "]");
        try {
            // 程序员乘坐交通工具花费在路上的时间(使用随机数字模拟)
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(programmer + " arrived by " + transportation);
        //完成任务时使计数器减一
        latch.countDown();
    }

    public static void main(String[] args) throws InterruptedException {
        Latch latch = new CountDownLatch(4);
        new ProgrammerTravel(latch, "Alex", "Bus").start();
        new ProgrammerTravel(latch, "Gavin", "Walking").start();
        new ProgrammerTravel(latch, "Jack", "Subway").start();
        new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
        //当前线程(main线程会进入阻塞,直到四个程序员全部都到达目的地)
        latch.await();
        System.out.println("== all of programmer arrived ==");
    }
}

执行 latch.await() 的方法会进入阻塞,知道前面四个子线程都执行过 countdown() 方法之后才能往下继续执行,否则会无限期等待下去。

超时机制

当子线程存在超时执行的可能性是,我们需要为 Latch 设置超时机制。

超时机制实现

Latch 抽象类中增加带时间参数的 await(TimeUnit unit,long time) 方法;

public abstract class Latch {

    ...

    public abstract void await(TimeUnit unit, long time)
    throws InterruptedException, WaitTimeoutException;
}

CountDownLatch 中实现超时接口;

@Override
public void await(TimeUnit unit, long time)
        throws InterruptedException, WaitTimeoutException {
    if (time <= 0)
        throw new IllegalArgumentException("The time is invalid.");
    long remainingNanos = unit.toNanos(time); //将time转换为纳秒
    //等待任务将在endNanos纳秒后超时
    final long endNanos = System.nanoTime() + remainingNanos;
    synchronized (this) {
        while (limit > 0) {
            //如果超时则抛出WaitTimeoutException异常
            if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0)
                throw new WaitTimeoutException("The wait time over specify time.");
            //等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
            this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
            remainingNanos = endNanos - System.nanoTime();
        }
    }
}

超时机制测试

public static void main(String[] args)
        throws InterruptedException {
    Latch latch = new CountDownLatch(4);
    new ProgrammerTravel(latch, "Alex", "Bus").start();
    new ProgrammerTravel(latch, "Gavin", "Walking").start();
    new ProgrammerTravel(latch, "Jack", "Subway").start();
    new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
    try {
        latch.await(TimeUnit.SECONDS, 5);
        System.out.println("== all of programmer arrived ==");
    } catch (WaitTimeoutException e) {
        e.printStackTrace();
    }
}

程序等待 5s 后,子任务未完成, CountdownLatch 抛出 WaitTimeoutException 异常通知主线程;

小结

Latch 设计模式提供了前置任务完成后再进行后面工作的设计方法,可以用在程序的缓存加载上,提高效率的同时,保证缓存加载完成之前任务阻塞,防止异常。自从 JDK 1.5 起也提供了 CountdownLatch 工具类,其实现是基于阻塞队列,但暴露的接口和上述相同。另外, CountdownLatch 只提供了门阀的阻塞功能,并不负责线程的管理,比如当子任务超时后, Latch 不会让超时的子任务停止,而是需要程序员自己控制任务的关闭。


Edit page
Share this post on:

Previous Post
机器数的表示
Next Post
Java 三元表达式的隐式类型转换