紧密相关的同步化队列现在来看一种紧密相关的同步方式。一个或多个生产者线程生产数据,并由一个或多个消费者线程按照先进先出的次序来获取。但是,生产者与消费者之间必须相互回合,即向队列中放入一个元素的生产者应阻塞直到该元素被另外一个消费者取出,反之亦然。其实现的伪代码描述如下:
这是一种基于管程的同步队列实现。由于这个队列设计非常简单,所以他的同步代价很高。在每个线程可能唤醒另一个线程的时间点,无论是入队者还是出队者都会唤醒所有的等待线程,从而唤醒的次数是等待线程数目的平方。尽管可以使用条件对象来减少唤醒次数,但由于仍需要阻塞每次调用,所以开销很大。
为了减少同步队列的同步开销,我们考虑另外一种同步队列的实现,在该实现中将入队与出队分两步完成。如出队从一个空队列删除元素的过程为,第一步,他将一个保留对象放入队列,表示该出队者这在等待一个准备与之会合的入队者。然后,出队者在这个保留对象的flag标志上自旋等待。第二步,当一个入队者发现该保留时,他通过存放一个元素并设置保留对象的flag来通知出队者完成保留。同样,入队者也能够通过创建自己的保留对象,并在保留对象的flag标志上自旋来等待会合同伴。在任意时刻,队列本身或者包含出队者的保留对象或者包含入队者的保留对象,或者为空。因此操作队列的线程共有4种交互对象,即入队者线程、入队者保留、出队者线程、出队者保留,由于需要进行紧密的同步,因此他们之间的对应关系如下:
即入队者线程要协助处理出队者保留,同样出队者线程要协助处理入队者保留。这种结构称为双重数据结构,其核心是方法是通过两个步骤来生效的:保留和完成。该结构具有很多很好的性质。首先,正在等待的线程可以在一个本地缓存标志上自旋,而这时可扩展性的基础。其次,他很自然的保证了公平性。保留按照他们到达的次序来排队,从而保证请求也按照同样的次序完成,因此这种结构是可以线性化的,因为每个部分方法调用在他完成时是可以排序的。
该队列结构可以用节点组成的链表来实现,其中节点或者表示一个等待出队的元素或者表示一个等待完成的保留,由节点的Type域指定。任何时候,所有的队列节点都应具备相同的类型,即或者全部是等待出队的元素,或者全部是等待完成的保留。当一个元素入队时,节点的item域存放该元素;当元素出队时,节点的item域被重新设置为null。当一个保留入队时,节点的item域为null;当保留被一个入队者完成时,节点的item域被重新设置为一个元素。
publicenumNodeType {
ITEM,RESERVATION;
}
其中,ITEM代表节点元素,RESERVATION代表保留对象。
import java.util.concurrent.atomic.AtomicReference;
publicclassSynNode {
volatileNodeType type;//节点类型
//节点元素,元素值为Java泛化类型
volatileAtomicReference item;
volatileAtomicReference> next;
SynNode(E eitem,NodeType etype){
item=newAtomicReference(eitem);
next=newAtomicReference>(null);
type=etype;
}
}
接下来定义队列的主体实现,其中主要包括:入队和出对操作。
import java.util.concurrent.atomic.AtomicReference;
publicclassSynchronousDualQueue {
privateAtomicReference> head,tail;//头尾哨兵节点
publicSynchronousDualQueue(){
//初始化空队列,创建一个具有任意值的节点,并让头尾指针指向该节点
SynNode snode=new SynNode(null,NodeType.ITEM);
head=newAtomicReference>(snode);
tail=newAtomicReference>(snode);
}
//入队操作
publicvoidenq(E item){
//创建将被入队的新节点
SynNode offer=newSynNode(item,NodeType.ITEM);
while(true){
//获取头尾哨兵节点
SynNode t=tail.get();
SynNode h=head.get();
//检验队列是否为空或者是否包含已入队元素的出队保留
if(t==h||t.type==NodeType.ITEM){
//读取tail节点的后继
SynNode n=t.next.get();
//判断读取的tail值是一致的,即tail的状态不会被并发线程更改
if(t==tail.get()){
//如果tail域没有指向队列的最后一个节点,则尝试推进tail,并重新开始
if(n!=null){
//尝试推进tail
tail.compareAndSet(t,n);
}
//如果tail域就是队列最后一个元素,那么尝试将tail的后继指向新增节点,//即将新增节点挂接到队尾
elseif(t.next.compareAndSet(n,offer)){
//如果成功挂接,那么尝试推进tail域,使其指向新增节点
tail.compareAndSet(t,offer);
//自旋等待,等待一个出队者通过设置该节点的item域为null来通知该元素已//经出队。这是因为这是一个紧密同步的队列,入队元素必须等待使用它的出队//者
while(offer.item.get()==item);
//一旦出队成功,就尝试移动头指针,以便进行清理。这是因为出队是从头节点//出队。通过将新增节点设置为head哨兵节点,实现对被使用节点的清理
h=head.get();
if(offer==h.next.get()){
head.compareAndSet(h,offer);
}
return;
}
}
}
//如果存在正在等待完成的出队者的保留,那么就找出一个保留并完成它
else{
//读取head的后继节点
SynNode node=h.next.get();
//判断读到的值是一致的,即运行过程中状态的一致性不能被并发线程所破坏,//主要通过尾指针是否一致、头指针是否一致、以及首节点是否为空来判断,这//些状态都可能在运行中被并发线程改变,因此只要有一个状态被改变,即认为//整体状态被破坏了,需要重新开始
if(t!=tail.get()||h!=head.get()||node==null){
continue;
}
//如果状态一致,那么尝试着将节点的item域从null改为要入队的元素。因为//出队者保留等待的是一个入队者,所以要将item域由null置为入队元素的//item
boolean success=node.item.compareAndSet(null,item);
//不管上一步是否成功,都尝试推进head
head.compareAndSet(h,node);
//如果推进head成功,该方法返回,否则重试
if(success){
return;
}
}
}
}
//出队操作,其操作逻辑与入队基本相似,在此不再赘述
public E deq(){
E result=null;
while(true){
SynNode h=head.get();
SynNode t=tail.get();
if(h==t || h.type==NodeType.ITEM){
SynNode n=h.next.get();
if(h==head.get()){
if(n==null){
tail.compareAndSet(t,new SynNode(null,NodeType.RESERVATION));
}elseif(head.compareAndSet(h,n)){
while(n.item.get()==null);
t=tail.get();
if(n==t.next.get()){
result=n.item.get();
tail.compareAndSet(t,n);
}
return result;
}
}
}
else{
SynNode node=t.next.get();
result=node.item.get();
if(t!=tail.get()||h!=head.get()||node==null){
continue;
}
boolean success=node.item.compareAndSet(result,null);
tail.compareAndSet(t,node);
if(success){
return result;
}
}
}
}
}