(点击上方公众号,可快速关注)
来源:赢时胜首席技术官--zhaoxin ,
blog.csdn.net/javacoffe/article/details/41827705
如有好文章投稿,请点击 → 这里了解详情
1前言
在并发系统中很多地方都要用到作为资源池的并行化队列,如在大多数应用中,一个或多个生产者线程生产数据,一个或多个消费者消费数据。这些数据元素可以是需要执行的的任务、要解释的键盘输入、待处理的订单或需要解码的数据包。有的时候生产者会突然加速,产生数据的速度会远远超过消费者使用数据的速度。为了使消费者可以跟得上生产者,需要生产者和消费者之间放置一个缓冲区,将那些来不及处理的数据先放在缓冲区中,以使得他们能被尽快的消费。此时多会采用队列作为池,来实现生产者和消费者之间的缓冲区。通常来说池有以下几种不同的变化方式:
Ø 池可以是有界的或者是无界的。有界池存放有限个元素,该界限称为池的容量。无界池可以存放任意数量的元素。当要求生产者不需要过快的超过消费者时,即生产和消费是一种松弛的同步,就要用到有界池。反之当不需要设置固定的界限,限制生产者可比消费者快多少的程度时,就要用到无界池。
Ø 操作池的方法可以是完全、部分或同步的。
u 若一个方法不需要等待某个条件成立,则称为该方法是完全的。如从空队列中删除元素时,可以立刻抛出返回错误。因此当生产者或消费者线程有比等待调用生效还要好的其他事情可处理时,完全化的操作接口非常适用。
u 若一个方法的调用需要等待某个条件成立,则称为该方法为部分的。如从空队列中删除元素的操作会被阻塞,直到池中有可用元素才返回。当生产者或消费者线程除了等待调用生效以外,没有其他更好的事情可做时,部分化的操作接口非常适用。
u 若一个方法需要等待另一个方法与他的调用相重叠,则称该方法为同步的。如一个向队列中添加元素的方法调用会被阻塞,直到被添加的元素被另一个线程的方法取用。当生产者和消费者线程之间需要进行通信或严格同步时,同步化的操作接口非常适用。
池可以基于各种数据结构来实现,进而实现各种公平策略。如基于队列的先进先出的池,基于栈的后进先出的池,以及其他的一些若公平性原则的池。但无论如何实现池的公平策略,都要解决在高并行环境下高性能、低消耗、无干扰的交互,事实上要达到这样级别的并行性并非易事。下面我们将分为部分有界队列池、完全无界队列池、以及同步化队列池几种情况,来探讨并行环境下基于队列的池,如何实现高级别并行的相关技术细节。
2 部分有界队列
下面的代码基于链表实现有界队列,并且使用Java5之后提供的原子变量、可重入锁、条件变量等手段实现并行环境下的线程交互控制。通过原子变量、可重入锁、条件变量等技术手段,可以实现更加细化的锁粒度的控制,相比之前Java提供的同步锁机制,这样实现能够实现更加高效的线程交互和更小的总体消耗。
首先定义基于链表的队列的元素节点对象:
publicclassBoundedNode {
public Object value;
public BoundedNode next;
public BoundedNode(Object x){
value=x;
next=null;
}
}
然后定义队列的主体实现,其中主要包括:入队和出对操作。
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
publicclassBoundedQueue {
ReentrantLock enqlock,deqlock;
Condition notempty,notfull;
AtomicInteger size;
BoundedNode head,tail;//队列头哨兵和尾哨兵
intcapacity;
publicBoundedQueue(intcapacity){
this.capacity=capacity;
head=newBoundedNode(null);
tail=head;
size=newAtomicInteger(0);
enqlock=newReentrantLock();
deqlock=newReentrantLock();
notfull=enqlock.newCondition();
notempty=deqlock.newCondition();
}
publicvoidenq(Object x) throwsInterruptedException{
boolean weakdeq=false;
//入队者首先获取入队锁
enqlock.lock();
try{
//判断队列是否为满,通过循环判断,结合上面的加锁,因此此方法也称为自旋//加锁,优势效率较高,缺点造成CPU消耗较大
while(size.get()==capacity){
//如果队列满,则在“不满”条件上等待,直到队列不满的条件发生,等待时会//暂时释放入队锁
notfull.await();
}
//如果“不满”条件满足,则构建新的队列元素,并将新的队列元素挂接到队列//尾部
BoundedNode e=new BoundedNode(x);
tail.next=tail=e;
//获取元素入队前队列容量,并在获取后将入队前队列容量增加1
if(size.getAndIncrement()==0){
//如果入队前队列容量等于0,则说明有出队线程正在等待出队条件notempty
//发生,因此要将相关标志置为true
weakdeq=true;
}
}finally{
//入队者释放入队锁
enqlock.unlock();
}
//判断出队等待标志
if(weakdeq){
//入队线程获取出队锁
deqlock.lock();
try{
//触发出队条件,即队列“不空”条件,使等待出队的线程能够继续执行
notempty.signalAll();
}finally{
//入队线程释放出队锁
deqlock.unlock();
}
}
}
publicObject deq() throwsInterruptedException{
Object result=null;
boolean weakenq=false;
//出队者首先获取出队锁
deqlock.lock();
try{
//判断队列是否为空,通过循环判断,结合上面的加锁,因此此方法也称为自旋//加锁,优势效率较高,缺点造成CPU消耗较大
while(size.get()==0){
//如果队列空,则在“不空”条件上等待,直到队列不空的条件发生,等待时会//暂时释放出队锁
notempty.await();
}
//如果“不空”条件满足,则通过队列头部哨兵获取首节点,并获取队列元素值
result=head.next.value;
head=head.next;
//获取元素出队前队列容量,并在获取后将出队前队列容量减少1
if(size.getAndDecrement()==capacity){
//如果出队前队列容量等于队列限额,则说明有入队线程正在等待入队条件//notfull发生,因此要将相关标志置为true
weakenq=true;
}
}finally{
//出队者释放出队锁
deqlock.unlock();
}
//判断入队等待标志
if(weakenq){
//出队线程获取入队锁
enqlock.lock();
try{
//触发入队条件,即队列“不满”条件,使等待入队的线程能够继续执行
notfull.signalAll();
}finally{
//出队线程释放入队锁
enqlock.unlock();
}
}
return result;
}
}
分别使用两个不同的锁,enqlock和deqlock,来保证在一个时刻最多只有一个入队者和一个出队者可以操作队列对象域。这种采用两个而不是一个锁的方式能够保证入队者不会锁住出队者,反之亦然。每个锁都有一个与之相关的条件变量。enqlock与notfull条件相关,即入队需要等待队列不满的条件;deqlock与notempty条件相关,即出队需要等待队列不空的条件。同时由于队列是有界的,所以必须跟踪空槽的个数。size域是用来记录队列中并发对象个数的整型原子变量,并且入队方法enq()会增加该值,出队方法deq()会减少该值。
一旦空槽数大于0,入队者就会继续执行。注意一旦入队者发现有空槽,则当入队者还在继续执行时,其他线程不能向队列中插入元素,因为其他的入队者被锁定,只会有一个并发出队者可以增加空槽数目。
当出队者将队列从不满变成满时,他获得enqlock并对notfull发出信号,即使size域没有被enqlock保护,也由于出队者在触发条件之前已经先获得了enqlock,所以出队者不可能在入队者的两个操作步骤之间产生信号。
这种实现的一个缺点就是并发的enq()方法和deq()方法会相互干扰,但又不通过锁。所有的方法对size域调用getAndIncrement()和getAndDecrement(),这些方法可能会产生比通常的读-写开销更大,且可能引起顺序瓶颈。减少这种干扰的一种方法,就是将这个域分成两个计数器,一个由deq()减1的整型域enqsize和一个由enq()加1的整型域deqsize。调用enq()的线程检测enqsize,只要他小于队列容量限额,就继续执行。当该域达到队列容量限额时,线程锁住deqlock并将deqsize加到enqsize中。当入队者的大小估值变得非常大时,这种技术能够分散地同步,而不是对每个方法调用进行同步。
看完本文有收获?请转发分享给更多人
关注「ImportNew」,看技术干货