啥?SynchronousQueue和钟点房一个道理

啥?SynchronousQueue和钟点房一个道理

今天这篇文章,我们继续讲架构师大刘的故事。

大刘有段时间经常会给一些程序员讲课。这一方面是由于团队培训的需要,一方面也是大刘自身想搞搞凡尔赛,嘚瑟一下自身的实力。

大刘讲课是允许公司任何一个人进去听的。提前一个星期把主题公布在公司群里,有人想听到日子直接去就是了。

有一次,大刘在聊并发话题的时候,为了彰显自己确实是个并发达人,用了个 SynchronousQueue 举例子。他说这个队列其实没有容积的概念,就是线程持有数据互相匹配。

嗯,谈到这里还是要说一下,大刘其实也不太懂 SynchronousQueue。只是一来这东西没人用,自然就没人懂;二来它的概念也比较晦涩,有些时候比较违背直觉,所以,即使随口说的一些话可能不太对,也未必会被发现,还能给人一种不明觉厉的感觉。

大刘用过几次,感觉良好。因此没事儿就要秀一下 SynchronousQueue,表示自己这么生僻的也懂,并发达人的名头是没有叫错的。

也就那一次,恰恰被人拆了台。

当时课上来了个新入职的技术,此人长得中等身材,相貌平平,只是脸却长的像种地多年的老农的巴掌。脸上的疙瘩如同老农巴掌上的老茧。这人姓张,这里由于他脸长得像个大巴掌,那就暂且叫他巴掌张。

这个巴掌张打断了大刘的话,言之凿凿说大刘说的是错的,说他看过这个 SynchronousQueue,并不是大刘说的这样。

大刘有点心虚,脖子渗出了一圈汗,但是并发达人的称呼大刘并不想丢掉。于是说了一大堆云里雾里的废话,把话题带偏了开去。并告诉巴掌张,下回要和他在这个舞台上 PK 一二, 要好好看看谁是真正的 SynchronousQueue 的知心朋友。

由于大刘感觉被巴掌张的巴掌糊了脸,便就此下了决心要研究透 SynchronousQueue。

Google 和百度一起查,东西合璧,洋为中用,搞了好是一阵子。最后有个犄角旮旯的小破网站,有人说了这么一句话:

SynchronousQueue 的目的就是为了接头,为了匹配,当接上头了就双方合作愉快,整个工作完成。但是一旦在接头中,任何一方还没到达,那么另一方就必须阻塞着等待。

这句话一下子就敲开了大刘的脑壳,让聪明的智商重新占领了高地。

为啥这句话就点亮了大刘那本来已经像灯泡的脑袋了呢?因为大刘想起了他每次的面试经历,就和这个接头是一样的。

大刘每次去面试,都很规矩的提前赶到新公司。但是大部分情况,时间到了之后都需要等很长时间才开始面试。大刘那时候也年轻,只是以为领导忙,所以倒也恭恭敬敬的等着。

直到大刘自己当了领导,去面试别人的时候,被 HR 委婉的提醒了下,要让候选人等一会儿再过去,显的公司业务很忙,让候选人对公司保持一定的敬畏。那时候,大刘才知道这是一种 PUA 术……

大刘对照着自己的面试经历,一下就理解了 SynchronousQueue 的概念。

SynchronousQueue 本身是为了交接、匹配而存在的。当一个线程往 SynchronousQueue 放东西,发现没线程在等着拿,就给阻塞掉——这就像面试者来早了等面试官。

当一个线程去 SynchronousQueue 拿东西,发现没东西,就去等的时候——就像面试官来早了等面试者。

搞懂 SynchronousQueue 的时候,正是一个冬天,屋外面的寒风在虎虎生威,屋里面的大刘在熠熠生辉。

只是一个堂而皇之摆在 JDK 底层并发包中的队列结构,SynchronousQueue 当然没那么简单,里面还存在着亿点点细节。

所以,大刘在整体方向搞懂之后,开始研究起了细节。他要奋发,狠狠把巴掌张的嚣张气焰压下去,大刘要当公司技术的头牌。

回到现实里,SynchronousQueue 真正的目的就是为了让两个线程的工作结果进行交接。这没什么问题。但是,在这个交接中是需要严格保密的,没有人可以窥视。

嗯,没错,就和你约了女朋友去钟点房那样的不能被窥视。

好,围绕这个 SynchronousQueue 的钟点房,咱们通过源代码,来看这亿点点细节。

首先,钟点房严格保密,里面是多少人,就不能让人知道。所以,就不能让别人通过方法得到具体的数据。对于 SynchronousQueue 来说,自然就是通过 size() 你得不到什么信息。

/**
* Always returns zero.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return zero
*/
public int size() {
  return 0;
}

/**
* Always returns {@code true}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return {@code true}
*/
public boolean isEmpty() {
  return true;
}

其次,钟点房也不能随便进去查房,看看都是谁。所以,自然就不能迭代。

/**
* Returns an empty iterator in which {@code hasNext} always returns
* {@code false}.
*
* @return an empty iterator
*/
public Iterator<E> iterator() {
  return Collections.emptyIterator();
}

再次,钟点房保护隐私,它也不能让你钻了漏子,不告诉你 XXX 是不是躲在了钟点房里。所以,你也不能知道钟点房里有没有某个人。

/**
* Always returns {@code false}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param o the element
* @return {@code false}
*/
public boolean contains(Object o) {
  return false;
}

/**
* Returns {@code false} unless the given collection is empty.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param c the collection
* @return {@code false} unless given collection is empty
*/
public boolean containsAll(Collection<?> c) {
  return c.isEmpty();
}

自然,钟点房也没什么权力赶人出去。

/**
* Always returns {@code false}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param o the element to remove
* @return {@code false}
*/
public boolean remove(Object o) {
  return false;
}

当然,作为一个商业化的钟点房,SynchronousQueue 还是很注意安全的,它贴心的提供了紧急转移的手段。

/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException            {@inheritDoc}
* @throws NullPointerException          {@inheritDoc}
* @throws IllegalArgumentException      {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
  if (c == null)
    throw new NullPointerException();
  if (c == this)
    throw new IllegalArgumentException();
  
  int n = 0;
    for (E e; (e = poll()) != null;) {
      c.add(e);
      ++n;
    }
  return n;
}

/**	
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException            {@inheritDoc}
* @throws NullPointerException          {@inheritDoc}
* @throws IllegalArgumentException      {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
  if (c == null)
    throw new NullPointerException();
  if (c == this)
    throw new IllegalArgumentException();
  
  int n = 0;
    for (E e; n < maxElements && (e = poll()) != null;) {
      c.add(e);
      ++n;
    }
  return n;
}

最后,钟点房就只能搞搞交接工作了。交接吗,自然是有交有接的,交的就得带东西。

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  // put:带着东西进屋子
  if (transferer.transfer(e, false, 0) == null) {
    Thread.interrupted();
    throw new InterruptedException();
  }
}

接的肯定不会带着东西,得留地方拿东西。

public E take() throws InterruptedException {
  // take:从屋子里把东西拿出来
  E e = transferer.transfer(null, false, 0);
  if (e != null)
    return e;
  Thread.interrupted();
  throw new InterruptedException();
}

但是呢,这交接工作啊,得在专人安排下进行。

为什么需要专人来帮忙?因为有时候我们的钟点房太受欢迎了,客人多,得排队管管。管这些排队的就是 Transfer,它是钟点房的经理。

/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization.  Since
* this is accessed only at most once per public method, there
* isn"t a noticeable performance penalty for using volatile
* instead of final here.
*/
private transient volatile Transferer<E> transferer;

/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
  /**
  * Performs a put or take.
  *
  * @param e if non-null, the item to be handed to a consumer;
  * if null, requests that transfer return an item
  * offered by producer.
  * @param timed if this operation should timeout
  * @param nanos the timeout, in nanoseconds
  * @return if non-null, the item provided or received; if null,
  * the operation failed due to timeout or interrupt --
  * the caller can distinguish which of these occurred
  * by checking Thread.interrupted.
  */
  abstract E transfer(E e, boolean timed, long nanos);
}

Transfer 经理每次开门营业的时候,会收到总部给的牌子,告诉他管理工作要注意方式方法,比如公平有效,比如优先服务 VIP 客人之类的。

/**
* 默认给vip客人开点后门
*/
public SynchronousQueue() {
  this(false);
}

/**
* 总部递牌子,告诉Transfer到底是公平还是不公平,
*/
public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

先看看适合劳苦大众的公平模式,先来先享受,晚来没折扣。

static final class TransferQueue<E> extends Transferer<E> {
  static final class QNode{...}
  transient volatile QNode head;    
  transient volatile QNode tail;
  transient volatile QNode cleanMe;
  TransferQueue() {
	//经典的链表套路,先搞个虚拟的头结点
    QNode h = new QNode(null, false); 
    head = h;
    tail = h;
  }
  ……
  ……

QNode 就是 Transfer 经理需要的牌子,上面记录点信息,别到时候弄错了。

static final class QNode {
  volatile QNode next; // 下一个排队的哥们儿
  volatile Object item; // 这次哥们带来的要交接的东西
  volatile Thread waiter; // 交接的线程
  final boolean isData;	// isData == true表示带着东西

  QNode(Object item, boolean isData) {
    this.item = item;
    this.isData = isData;
  }
	
  // ...省略一系列CAS方法
}

怎么搞,秘密都在 transfer() 里。

@SuppressWarnings("unchecked")
  E transfer(E e, boolean timed, long nanos) {
  //...先省略细节        
}

transfer 本质就是一直在等待交接完成或者交接被中断,被取消,或者等待超时。

for (;;) {
  QNode t = tail;
  QNode h = head;
	//因为初始化是在构造函数里搞得,可能构造函数没有执行完,就被用上了,就会出现t或者h为null的情况
  if (t == null || h == null)         
    continue; //啥也不能做
    
	//h==t表示没人,t.isData == isData表示过来的哥们和前面的哥们目的一样,那就只能考虑排队等着了。
  if (h == t || t.isData == isData) { 
    QNode tn = t.next;
    //线程不安全需要考虑的,现在的尾巴不对,指错了,重新确认下
		if (t != tail)                  
      continue;
      
		//队尾确定了,发现又来了人,把尾巴指向新来的人
    if (tn != null) {             
      advanceTail(t, tn);
      continue;
    }
		
    //超时了,别等了
    if (timed && nanos <= 0)
      return null;
      
		//总算没事儿了,哥们可以登记进屋了
    if (s == null)
      s = new QNode(e, isData);
      
		//中间可能有人插队,只能再等等
    if (!t.casNext(null, s))        
      continue;
				
		//准备进屋等着约的人
    advanceTail(t, s);              
    Object x = awaitFulfill(s, e, timed, nanos);
    
		//同一个人出来,那就是任务失败了
    if (x == s) {
      //清理下                   
      clean(t, s);
      return null;
    }
    
    if (!s.isOffList()) { //还没脱队
      advanceHead(t, s); //排前面单独处理
      if (x != null) //交接成功设一下标记
        s.item = s;
        s.waiter = null;
    }
    
    return (x != null) ? (E)x : e;

这段是不是看着很头痛?其实 Transfer 这小子也头痛。

它首先要面临的第一个问题:资源竞争的问题。

客人源源不断的来,由于 Transfer 强迫症,他想每次必须从绝对的队头或者队尾巴开始,所以,每次都要判断下,到底他看到的队头或者队尾,是不是真正的队头、队尾。

确定没问题了,新来的客人就开始被打造成真正的队尾。

然后,成为队尾的哥们就可以等着属于自己的 Mr.Right 过来交接了。等着交接一直到成功或者失败的方法就是 awaitFulfill(t, tn)。

这边有人在等待,同时另外一边,交接的人们也开始陆续过来了。

else { // complementary-mode
  QNode m = h.next; // node to fulfill
  if (t != tail || m == null || h != head)
    continue; // inconsistent read

    Object x = m.item;
    if (isData == (x != null) || // m already fulfilled
      x == m || // m cancelled
      !m.casItem(x, e)) { // 交接的核心语句
        advanceHead(h, m); // dequeue and retry
        continue;
      }

  advanceHead(h, m); // successfully fulfilled
  LockSupport.unpark(m.waiter);
  return (x != null) ? (E)x : e;
}

交接最核心的其实就是 m.casItem(x, e)。交接成功,大家各回各家了。

整体的流程如下:

  1. 开始就是个经典链表开局,head = tail

  2. 陆续开始有节点链接,put 的时候,isData = true;take 的时候,isData = false

  3. 可能会同时有很多的 put 操作,没有对应的 take 操作,他们就按照次序一个个链接起来,形成链表,并通过 awaitFulfill 方法等着对应的 take

  4. 也可能同时会有很多的 take 操作,而没有对应的 put 操作,会形成链表,并通过 awaitFulfill 方法等着对应的 put

  5. take 操作会从链表头开始找匹配的 put,然后通过 casItem 方法交接

  6. put 操作会从链表头开始找匹配的 take,然后通过 casItem 方法交接

所以,SynchronousQueue 你可以看到了,专门就是搞交接任务。

  • put 的哥们发现没人 take,就等在那里,等着take操作。
  • take的哥们儿发现没人put,也会等在那里,等着put操作。

这就是我们的 SynchronousQueue 钟点房做的事情。

OK,钟点房既然开门做生意,它也要赚钱的嘛。所以,它还得搞搞 VIP 客户收费,也得为 VIP 客户搞一些优待。

对于这些 VIP 客人,我们的 Transfer 经理会特意安排下,以栈的形式来安排客人,越后来的客人越大牌儿。所以,自然是后来的客人会优先搞定交接了。这里简短的介绍下,就不再赘述了。

Transfer 化身成 TransferStack,后来的优先服务。

  1. 开始自然是链表开局,一个无意义的链表头指向了 null

  2. 发现链表是空了,二话不说,客官,您进来先啦

  3. 和 TransferQueue 一样,如果都是 take 过来,模式就是 REQUEST,就得排队了

  4. 交接人出现,哥们可以收摊儿了

  5. 其余的不说了,一样的,说多了没劲

话说,大刘搞清楚了这些细节之后,次日,当巴掌张再次进行挑衅时,大刘彻底稳下来了。

当挨个把细节讲的一清二楚之后,看着巴掌张那张落寞的巴掌脸,瞬间也不觉得像巴掌了,而是像是在猜拳中出的石头剪刀布中的布。大刘没忍住,对着这个布比划出了个剪刀,光荣的结束了战斗。

大刘依然在技术流中独占鳌头。

我们下篇大刘的故事见。


你好,我是四猿外。

一家上市公司的技术总监,管理的技术团队一百余人。

我从一名非计算机专业的毕业生,转行到程序员,一路打拼,一路成长。

我会通过公众号,
把自己的成长故事写成文章,
把枯燥的技术文章写成故事。

我建了一个读者交流群,里面大部分是程序员,一起聊技术、工作、八卦。欢迎加我微信,拉你入群。

hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » 啥?SynchronousQueue和钟点房一个道理