专栏名称: 老马说编程
从入门到高级, 深入浅出, 老马和你一起探索编程及计算机技术的本质, 篇篇原创, 用心写作。
目录
相关文章推荐
程序员的那些事  ·  微软紧急下架!900 万次下载的知名 ... ·  2 天前  
OSC开源社区  ·  推理中心化:构建未来AI基础设施的关键 ·  5 天前  
51好读  ›  专栏  ›  老马说编程

(81) 并发同步协作工具 / 计算机程序的思维逻辑

老马说编程  · 公众号  · 程序员  · 2017-03-21 08:43

正文

查看历史文章,请点击上方链接关注公众号。



我们在 67节 68节 实现了线程的一些基本协作机制,那是利用基本的wait/notify实现的,我们提到,Java并发包中有一些专门的同步工具类,本节,我们就来探讨它们。


我们要探讨的工具类包括:

  • 读写锁ReentrantReadWriteLock

  • 信号量Semaphore

  • 倒计时门栓CountDownLatch

  • 循环栅栏CyclicBarrier


71节 介绍的显示锁和 72节 介绍的显示条件类似,它们也都是基于AQS实现的,AQS可参看71节。在一些特定的同步协作场景中,相比使用最基本的wait/notify,显示锁/条件,它们更为方便,效率更高。下面,我们就来探讨它们的基本概念、用法、用途和基本原理。


读写锁ReentrantReadWriteLock

之前章节我们介绍了两种锁, 66节 介绍了synchronized, 71节 介绍了显示锁ReentrantLock。对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。


怎么让读操作能够并行,又不影响一致性呢?答案是使用读写锁。在Java并发包中,接口ReadWriteLock表示读写锁,主要实现类是可重入读写锁ReentrantReadWriteLock。


ReadWriteLock的定义为:

public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}


通过一个ReadWriteLock产生两个锁,一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。


需要注意的是, 只有"读-读"操作是可以并行的,"读-写"和"写-写"都不可以。 只有一个线程可以进行写操作,在获取写锁时,只有没有任何线程持有任何锁才可以获取到,在持有写锁时,其他任何线程都获取不到任何锁。在没有其他线程持有写锁的情况下,多个线程可以获取和持有读锁。


ReentrantReadWriteLock是可重入的读写锁,它有两个构造方法,如下所示:

public ReentrantLock()

public ReentrantLock(boolean fair)


fire表示是否公平,不传递的话是false,含义与 显式锁一节 介绍的类似,就不赘述了。


我们看个简单的例子,使用ReentrantReadWriteLock实现一个缓存类MyCache,代码如下:

public class MyCache {
private Map map = new HashMap();
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();

public Object get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}

public Object put(String key, Object value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}

public void clear() {
writeLock.lock();
try {
map.clear();
} finally {
writeLock.unlock();
}
}
}


代码比较简单,就不赘述了。


读写锁是怎么实现的呢?读锁和写锁看上去是两个锁,它们是怎么协调的?具体实现比较复杂,我们简述下其思路。


内部,它们使用同一个整数变量表示锁的状态,16位给读锁用,16位给写锁用,使用一个变量便于进行CAS操作, 锁的等待队列其实也只有一个


写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。


读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了0,如果是,唤醒等待队列中的下一个线程。


信号量Semaphore

之前介绍的锁都是限制只有一个线程可以同时访问一个资源。现实中,资源往往有多个,但每个同时只能被一个线程访问,比如,饭店的饭桌、火车上的卫生间。有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。还有的情况,与软件的授权和计费有关,对不同等级的账户,限制不同的最大并发访问数。


信号量类Semaphore就是用来解决这类问题的,它可以限制对资源的并发访问数,它有两个构造方法:

public Semaphore(int permits)

public Semaphore(int permits, boolean fair)


fire表示公平,含义与之前介绍的是类似的,permits表示许可数量。


Semaphore的方法与锁是类似的,主要的方法有两类,获取许可和释放许可,主要方法有:

//阻塞获取许可

public void acquire() throws InterruptedException

//阻塞获取许可,不响应中断

public void acquireUninterruptibly()

//批量获取多个许可

public void acquire(int permits) throws InterruptedException

public void acquireUninterruptibly(int permits)

//尝试获取

public boolean tryAcquire()

//限定等待时间获取

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException

//释放许可

public void release()


我们看个简单的示例,限制并发访问的用户数不超过100,代码如下:

public class AccessControlService {
public static class ConcurrentLimitException extends RuntimeException {
private static final long serialVersionUID = 1L;
}

private static final int MAX_PERMITS = 100;
private Semaphore permits = new Semaphore(MAX_PERMITS, true);

public boolean login(String name, String password) {
if (!permits.tryAcquire()) {
// 同时登录用户数超过限制
throw new ConcurrentLimitException();
}
// ..其他验证
return true;
}

public void logout(String name) {
permits.release();
}
}


代码比较简单,就不赘述了。


需要说明的是,如果我们将permits的值设为1,你可能会认为它就变成了一般的锁,不过,它与一般的锁是不同的。 一般锁只能由持有锁的线程释放,而Semaphore表示的只是一个许可数,任意线程都可以调用其release方法 。主要的锁实现类ReentrantLock是可重入的,而Semaphore不是,每一次的acquire调用都会消耗一个许可,比如,看下面代码段:

Semaphore permits = new Semaphore(1);
permits.acquire();
permits.acquire();
System.out.println("acquired");


程序会阻塞在第二个acquire调用,永远都不会输出"acquired"。


信号量的基本原理比较简单,也是基于AQS实现的,permits表示共享的锁个数,acquire方法就是检查锁个数是否大于0,大于则减一,获取成功,否则就等待,release就是将锁个数加一,唤醒第一个等待的线程。


倒计时门栓CountDownLatch

我们在 68节 使用wait/notify实现了一个简单的门栓MyLatch,我们提到,Java并发包中已经提供了类似工具,就是CountDownLatch。它的大概含义是指,它相当于是一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过,它是一次性的,打开后就不能再关上了。


CountDownLatch里有一个计数,这个计数通过构造方法进行传递:

public CountDownLatch(int count)


多个线程可以基于这个计数进行协作,它的主要方法有:

public void await() throws InterruptedException

public boolean await(long timeout, TimeUnit unit) throws InterruptedException

public void countDown()


await()检查计数是否为0,如果大于0,就等待,await()可以被中断,也可以设置最长等待时间。countDown检查计数,如果已经为0,直接返回,否则减少计数,如果新的计数变为0,则唤醒所有等待的线程。


68节 ,我们介绍了门栓的两种应用场景,一种是同时开始,另一种是主从协作。它们都有两类线程,互相需要同步,我们使用CountDownLatch重新演示下。


在同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用await,主线程调用countDown,示例代码如下:

public class RacerWithCountDownLatch {
static class Racer extends Thread {
CountDownLatch latch;

public Racer(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
try {
this.latch.await();
System.out.println(getName()
+ " start run "+System.currentTimeMillis());
} catch (InterruptedException e) {
}
}
}

public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch latch = new CountDownLatch(1);
Thread[] racers = new Thread[num];
for (int i = 0; i             racers[i] = new Racer(latch);
racers[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}


代码比较简单,就不赘述了。在主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用countDown,主线程调用await进行等待,示例代码如下:

public class MasterWorkerDemo {
static class Worker extends Thread {
CountDownLatch latch;

public Worker(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
try {
// simulate working on task
Thread.sleep((int) (Math.random() * 1000));

// simulate exception
if (Math.random()                     throw new RuntimeException("bad luck");
}
} catch (InterruptedException e) {
} finally {
this.latch.countDown();
}
}
}

public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
CountDownLatch latch = new CountDownLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for (int i = 0; i             workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
}


需要强调的是,在这里,countDown的调用应该放到finally语句中 ,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。


循环栅栏CyclicBarrier

我们在 68节 使用wait/notify实现了一个简单的集合点AssemblePoint,我们提到,Java并发包中已经提供了类似工具,就是CyclicBarrier。它的大概含义是指,它相当于是一个栅栏,所有线程在到达该栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。


CyclicBarrier特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。


与CountDownLatch类似,它也有一个数字,但表示的是参与的线程个数,这个数字通过构造方法进行传递:

public CyclicBarrier(int parties)







请到「今天看啥」查看全文