专栏名称: 狗厂
目录
相关文章推荐
康石石  ·  我跨专业拿到了世界第一的offer!!! ·  昨天  
康石石  ·  伦时fashion ... ·  2 天前  
51好读  ›  专栏  ›  狗厂

高并发编程-CountDownLatch深入解析

狗厂  · 掘金  ·  · 2018-05-14 06:51

正文

若文中代码格式阅读困难,可点击文末"阅读原文"链接友好阅读。

要点解说

CountDownLatch允许一个或者多个线程一直等待,直到一组其它操作执行完成。在使用CountDownLatch时,需要指定一个整数值,此值是线程将要等待的操作数。当某个线程为了要执行这些操作而等待时,需要调用await方法。await方法让线程进入休眠状态直到所有等待的操作完成为止。当等待的某个操作执行完成,它使用countDown方法来减少CountDownLatch类的内部计数器。当内部计数器递减为0时,CountDownLatch会唤醒所有调用await方法而休眠的线程们。

实例演示

下面代码演示了CountDownLatch简单使用。演示的场景是5位运动员参加跑步比赛,发令枪打响后,5个计时器开始分别计时,直到所有运动员都到达终点。

  1. public class CountDownLatchDemo {

  2.    public static void main(String[] args) {

  3.        Timer timer = new Timer(5);

  4.        new Thread(timer).start();

  5.        for (int athleteNo = 0; athleteNo < 5; athleteNo++) {

  6.            new Thread(new Athlete(timer, "athlete" + athleteNo)).start();

  7.        }

  8.    }

  9. }

  10. class Timer implements Runnable {

  11.    CountDownLatch timerController;

  12.    public Timer(int numOfAthlete) {

  13.        this.timerController = new CountDownLatch(numOfAthlete);

  14.    }    

  15.    public void recordResult(String athleteName) {

  16.        System.out.println(athleteName + " has arrived");

  17.        timerController.countDown();

  18.        System.out.println("There are " + timerController.getCount() + " athletes did not reach the end");

  19.    }    

  20.    @Override

  21.    public void run() {

  22.        try {

  23.            System.out.println("Start...");

  24.            timerController.await();

  25.            System.out.println("All the athletes have arrived");

  26.        } catch (InterruptedException e) {

  27.            e.printStackTrace();

  28.        }

  29.    }

  30. }

  31. class Athlete implements Runnable {

  32.    Timer timer;

  33.    String athleteName;    

  34.    public Athlete(Timer timer, String athleteName) {

  35.        this.timer = timer;

  36.        this.athleteName = athleteName;

  37.    }    

  38.    @Override

  39.    public void run() {

  40.        try {

  41.            System.out.println(athleteName + " start running");

  42.            long duration = (long) (Math.random() * 10);

  43.            Thread.sleep(duration * 1000);

  44.            timer.recordResult(athleteName);

  45.        } catch (InterruptedException e) {

  46.            e.printStackTrace();

  47.        }

  48.    }

  49. }

输出结果如下所示:

  1.  Start...

  2. athlete0 start running

  3. athlete1 start running

  4. athlete2 start running

  5. athlete3 start running

  6. athlete4 start running

  7. athlete0 has arrived

  8. There are 4 athletes did not reach the end

  9. athlete3 has arrived

  10. There are 3 athletes did not reach the end

  11. athlete2 has arrived

  12. athlete1 has arrived

  13. There are 1 athletes did not reach the end

  14. There are 2 athletes did not reach the end

  15. athlete4 has arrived

  16. There are 0 athletes did not reach the end

  17. All the athletes have arrived

方法解析

1.构造方法 CountDownLatch(int count)构造一个指定计数的CountDownLatch,count为线程将要等待的操作数。

2.await() 调用await方法后,使当前线程在锁存器(内部计数器)倒计数至零之前一直等待,进入休眠状态,除非线程被中断。如果当前计数递减为零,则此方法立即返回,继续执行。

3.await(long timeout, TimeUnit unit) 调用await方法后,使当前线程在锁存器(内部计数器)倒计数至零之前一直等待,进入休眠状态,除非线程被 中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。

3.acountDown() acountDown方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。

4.getCount() 调用此方法后,返回当前计数,即还未完成的操作数,此方法通常用于调试和测试。

源码解析

进入源码分析之前先看一下CountDownLatch的类图,

Sync是CountDownLatch的一个内部类,它继承了AbstractQueuedSynchronizer。

CountDownLatch(int count)、await()和countDown()三个方法是CountDownLatch的核心方法,本篇将深入分析这三个方法的具体实现原理。

1.CountDownLatch(int count)

  1.       public CountDownLatch(int count) {

  2.        if (count < 0) throw new IllegalArgumentException("count < 0");

  3.        this.sync = new Sync(count);

  4.    }

该构造方法根据给定count参数构造一个CountDownLatch,内部创建了一个Sync实例。Sync是CountDownLatch的一个内部类,其构造方法代码如下:

  1.       Sync(int count) {

  2.        setState(count);

  3.    }

setState方法继承自AQS,给Sync实例的state属性赋值。

  1.       protected final void setState(int newState) {

  2.        state = newState;

  3.    }

这个state就是CountDownLatch的内部计数器。

2.await() 当await()方法被调用时,当前线程会阻塞,直到内部计数器的值等于零或当前线程被中断,下面深入代码分析。

  1.       public void await() throws InterruptedException {

  2.        sync.acquireSharedInterruptibly(1);

  3.    }    

  4.    public final void acquireSharedInterruptibly(int arg)

  5.            throws InterruptedException {

  6.        //如果当前线程中断,则抛出InterruptedException

  7.        if (Thread.interrupted())

  8.            throw new InterruptedException();            

  9.        //尝试获取共享锁,如果可以获取到锁直接返回;

  10.        //如果获取不到锁,执行doAcquireSharedInterruptibly

  11.        if (tryAcquireShared(arg) < 0)

  12.            doAcquireSharedInterruptibly(arg);

  13.    }    

  14.    //如果当前内部计数器等于零返回1,否则返回-1;

  15.    //内部计数器等于零表示可以获取共享锁,否则不可以;

  16.    protected int tryAcquireShared(int acquires) {

  17.        return (getState() == 0) ? 1 : -1;

  18.    }    

  19.    //返回内部计数器当前值

  20.    protected final int getState() {

  21.        return state;

  22.    }    

  23.    //该方法使当前线程一直等待,直到当前线程获取到共享锁或被中断才返回

  24.    private void doAcquireSharedInterruptibly(int arg)

  25.        throws InterruptedException {

  26.        //根据当前线程创建一个共享模式的Node节点

  27.        //并把这个节点添加到等待队列的尾部

  28.        //AQS等待队列不熟悉的可以查看AQS深入解析的内容

  29.        final Node node = addWaiter(Node.SHARED);

  30.        boolean failed = true;

  31.        try {

  32.            for (;;) {

  33.                //获取新建节点的前驱节点

  34.                final Node p = node.predecessor();

  35.                //如果前驱节点是头结点

  36.                if (p == head) {

  37.                    //尝试获取共享锁

  38.                    int r = tryAcquireShared(arg);

  39.                    //获取到共享锁

  40.                    if (r >= 0) {

  41.                        //将前驱节点从等待队列中释放

  42.                        //同时使用LockSupport.unpark方法唤醒前驱节点的后继节点中的线程

  43.                        setHeadAndPropagate(node, r);

  44.                        p.next = null; // help GC

  45.                        failed = false;

  46.                        return;

  47.                    }

  48.                }

  49.                //当前节点的前驱节点不是头结点,或不可以获取到锁

  50.                //shouldParkAfterFailedAcquire方法检查当前节点在获取锁失败后是否要被阻塞







请到「今天看啥」查看全文