构造方法
publicSynchronousQueue(){this(false);// 默认非公平 TransferStack}publicSynchronousQueue(booleanfair){transferer=fair?newTransferQueue<E>():newTransferStack<E>();}TransferStack
核心变量
volatileSNodehead;// 栈顶staticfinalUnsafeUNSAFE;staticlongheadOffset;staticfinalintREQUEST=0;// 消费者结点(take/poll)staticfinalintDATA=1;// 生产者结点(put/offer)staticfinalintFULFILLING=2;// 匹配中标记位SNode
staticfinalclassSNode{volatileSNodehead;// 栈头,所有阻塞线程压入栈顶volatileSNodenext;// 栈下一节点volatileSNodematch;// 匹配成功的对方节点volatileThreadwaiter;// 阻塞等待的线程Objectitem;// 数据:生产者存元素,消费者为nullintmode;// 节点类型:0未定义,1生产者(DATA),2消费者(REQUEST)// Unsafe内存偏移,自旋CAS必备privatestaticfinalsun.misc.UnsafeUNSAFE;privatestaticfinallongmatchOffset;privatestaticfinallongnextOffset;privatestaticfinallongitemOffset;static{try{UNSAFE=sun.misc.Unsafe.getUnsafe();Class<?>k=SNode.class;matchOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("match"));nextOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("next"));itemOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("item"));}catch(Exceptione){thrownewError(e);}}}casHead
// CAS修改栈头booleancasHead(SNodeh,SNodenh){returnh==head&&UNSAFE.compareAndSwapObject(this,headOffset,h,nh);}casMatch
booleancasMatch(SNodecmp,SNodeval){returnUNSAFE.compareAndSwapObject(this,matchOffset,cmp,val);}casNext
// CAS 操作辅助方法booleancasNext(SNodecmp,SNodeval){returncmp==next&&UNSAFE.compareAndSwapObject(this,nextOffset,cmp,val);}tryMatch
booleantryMatch(SNodes){if(match==null&&casMatch(null,s)){Threadw=waiter;if(w!=null){waiter=null;LockSupport.unpark(w);// 唤醒对方线程}returntrue;}returnfalse;}tryCancel
// 取消节点,标记match为自身,表示取消等待booleantryCancel(){returncasMatch(null,this);}isCanceled
booleanisCancelled(){returnmatch==this;}transfer
- 生产者调用:transfer(item, true, nanos) 存入数据,阻塞等待消费者取走
- 消费者调用:transfer(null, true, nanos) 获取数据,阻塞等待生产者存入
- 第二个参数 timed:是否限时等待;nanos 超时时间
Etransfer(Ee,booleantimed,longnanos){SNodes=null;intmode=(e==null)?REQUEST:DATA;for(;;){SNodeh=head;// 分支1:栈空 || 栈顶和当前线程同类型(全生产者/全消费者),入栈阻塞等待if(h==null||h.mode==mode){// 超时且没时间等待,直接返回nullif(timed&&nanos<=0){if(h!=null&&h.isCancelled())casHead(h,h.next);elsereturnnull;}// 能等待:创建节点压入栈顶elseif(casHead(h,s=snode(s,e,h,mode))){// 阻塞等待匹配SNodem=awaitFulfill(s,timed,nanos);// m==自身:等待被中断/超时取消,清理栈返回nullif(m==s){clean(s);returnnull;}// 辅助弹出已完成匹配的节点,减轻后续CAS竞争if((h=head)!=null&&h.next==s)casHead(h,s.next);// 返回数据:消费者拿生产者item,生产者返回自身数据return(E)((mode==REQUEST)?m.item:s.item);}}// 分支2:栈顶是互补节点,且不在交接中,主动发起匹配elseif(!isFulfilling(h.mode)){// 栈顶节点已取消,弹出重试if(h.isCancelled())casHead(h,h.next);// 压入一个带FULFILLING标记的交接节点elseif(casHead(h,s=snode(s,e,h,FULFILLING|mode))){for(;;){SNodem=s.next;// 等待节点消失,本次匹配失败,重置循环if(m==null){casHead(s,null);s=null;break;}SNodemn=m.next;// CAS绑定匹配关系,唤醒对方线程if(m.tryMatch(s)){// 一次性弹出交接节点+等待节点casHead(s,mn);return(E)((mode==REQUEST)?m.item:s.item);}else// 匹配竞争失败,辅助断开失效节点s.casNext(m,mn);}}}// 分支3:栈顶正在交接,当前线程协助完成匹配(加速清理栈)else{SNodem=h.next;if(m==null)casHead(h,null);else{SNodemn=m.next;if(m.tryMatch(h))casHead(h,mn);elseh.casNext(m,mn);}}}}awaitFulfill
SNodeawaitFulfill(SNodes,booleantimed,longnanos){finallongdeadline=timed?System.nanoTime()+nanos:0L;Threadw=Thread.currentThread();// 自旋次数优化:栈顶就是自己时多自旋,减少park切换intspins=(head.next==s)?1:0;for(;;){SNodem=s.match;// 已经匹配成功,返回对方节点if(m!=null)returnm;// 线程中断 / 超时,标记取消if(w.isInterrupted()||(timed&&nanos<=0)){s.tryCancel();returns;}// 自旋超过阈值,设置waiter准备parkif(spins>32)s.waiter=w;elseif((head!=s||s.match!=null)&&++spins==0)spins=32;// 休眠让出CPUif(!timed)LockSupport.park(this);elseLockSupport.parkNanos(this,nanos);if(timed)nanos=deadline-System.nanoTime();}}snode
// 构建/复用SNode,缓存失效节点减少创建开销staticSNodesnode(SNodes,Objecte,SNodenext,intmode){if(s==null)s=newSNode(e);s.mode=mode;s.next=next;s.item=e;s.match=null;s.waiter=null;returns;}入队方法
put
publicvoidput(Ee)throwsInterruptedException{if(e==null)thrownewNullPointerException();// transfer 参数:e!=null 代表是存放数据的生产者;不限时阻塞// e == null:表示当前是消费者if(transferer.transfer(e,false,0)==null){Thread.interrupted();thrownewInterruptedException();}}add
publicbooleanadd(Ee){if(offer(e))returntrue;elsethrownewIllegalStateException("Queue full");}offer(不超时)
publicbooleanoffer(Ee){if(e==null)thrownewNullPointerException();// timed=true,nanos=0:自旋一次,不park,匹配不到直接返回returntransferer.transfer(e,true,0)!=null;}offer(超时)
publicbooleanoffer(Ee,longtimeout,TimeUnitunit)throwsInterruptedException{if(e==null)thrownewNullPointerException();longnanos=unit.toNanos(timeout);Ex=transferer.transfer(e,true,nanos);if(x!=null)returntrue;if(Thread.interrupted())thrownewInterruptedException();returnfalse;}出队方法
take
publicEtake()throwsInterruptedException{Ee=transferer.transfer(null,false,0);if(e!=null)returne;Thread.interrupted();thrownewInterruptedException();}poll
publicEpoll(){returntransferer.transfer(null,true,0);}poll(超时)
publicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{Ee=transferer.transfer(null,true,unit.toNanos(timeout));if(e!=null||!Thread.interrupted())returne;thrownewInterruptedException();}总结
transfer 三大分支
- 分支 1:if (h == null || h.mode == mode) 入栈阻塞
- 触发场景:栈空,或者栈顶和当前线程是同一角色(一堆生产者 / 一堆消费者),没法直接匹配。
- 执行步骤:
- 如果开启超时且剩余时间为 0,清理栈内取消节点直接返回 null;
- 调用snode()复用节点,CAS 替换 head 压栈;
- awaitFulfill()自旋 park 阻塞,等待其他互补线程来匹配自己;
- 先自旋一小段时间优化,减少线程切换开销;
- 自旋超过阈值,标记 waiter 并调用 LockSupport.park 休眠;
- 阻塞唤醒后判断:
- 返回 m != 自身:匹配成功,清理栈辅助弹出节点,返回数据;
- 返回 m == 自身:中断 / 超时取消,调用clean()清理无效节点,返回 null。
- 分支 2:else if (!isFulfilling (h.mode)) 主动发起匹配
- 触发场景:栈顶是互补角色,且栈顶节点没有正在交接。
- 执行步骤:
- 栈顶节点已取消,弹出 head 重试循环;
- 创建带 FULFILLING|mode 的交接节点压入栈顶;
- 内层无限循环拿到栈下一个等待节点 m;
- m.tryMatch() CAS 绑定匹配关系,唤醒对方阻塞线程;
- CAS 一次性弹出交接节点 s 和等待节点 m,返回交付数据。
- 分支 3:else 协助交接(性能优化分支)
- 触发场景:栈顶节点已经带 FULFILLING 标记,正在和下方节点匹配。
- 当前线程不做新的入栈、不发起新匹配,只做辅助工作:
- 帮忙执行tryMatch完成栈顶未做完的绑定;
- CAS 断开失效、已取消的节点;
- 弹出匹配完成的双节点,减少后续线程的 CAS 竞争,大幅提升高并发吞吐。
流程图解
先 put 再 take
初始状态
head = null,栈空
执行 put
transfer(“Apple”, false, 0)
mode = DATA,h=null 进入分支 1CAS 创建 S_p (DATA, item=“Apple”),压栈,head 指向 S_p
head ↓ [S_p | DATA | item=Apple | waiter=Tp | match=null] next=null- 执行 awaitFulfill(S_p)
自旋几次无匹配对象,设置 waiter=Tp,调用 LockSupport.park() - Tp 阻塞挂起,停在 awaitFulfill,等待 match 被赋值
执行 take
- transfer(null, false, 0),mode=REQUEST
读取栈顶 h=S_p,h.mode=DATA ≠ REQUEST,且 !isFulfilling(h.mode) → 分支 2 - 创建交接节点 S_f:mode = FULFILLING | REQUEST
- CAS 将 S_f 压入栈顶,head 更新为 S_f
head ↓ [S_f | FULFILLING|REQUEST | next=S_p] ↓ [S_p | DATA | item=Apple | waiter=Tp | match=null]- 内层循环:m = S_f.next = S_p(待匹配生产者节点)
- 执行 S_p.tryMatch(S_f) 核心绑定:
- CAS 把 S_p.match = S_f
- 拿到 S_p.waiter = Tp,执行 LockSupport.unpark(Tp) 唤醒生产者
- 返回 true,匹配绑定完成
- CAS 一次性弹出栈顶两个节点:casHead(S_f, S_p.next),S_p.next 是 null,执行后 head = null,栈清空
- 当前线程是 REQUEST 消费者,返回 m.item = S_p.item = “Apple”
- take () 方法拿到 “Apple”,消费者 Tc 执行完毕退出
put 线程被唤醒
Tp 从 park 处唤醒,回到 awaitFulfill 循环
- 读取 S_p.match = S_f,match 不为空,直接 return S_f(匹配节点)
- 回到 transfer 分支 1 收尾逻辑:辅助清理栈
- if ((h = head) != null && h.next == s) casHead(h, s.next);
- mode 是 DATA 生产者,返回 s.item = “Apple”
- put 方法 transfer 返回非 null,无中断,put 执行完成,生产者 Tp 退出
先 take 再 put
- Tc 先 take,栈空走分支 1,压入 S_c (REQUEST),Tc park 阻塞
head → [S_c | REQUEST | waiter=Tc]- Tp 执行 put (“Banana”),栈顶 mode 互补,走分支 2
创建 FULFILLING|DATA 交接节点 S_f 压栈
head → [S_f | FULFILLING|DATA] → [S_c | REQUEST]- S_c.tryMatch(S_f),绑定 match,unpark (Tc)
- 弹出双节点,栈清空;生产者返回自己的 Banana,消费者唤醒后拿到数据