前言:

  前面讲完了一些并发编程的原理,现在我们要来学习的是线程之间的协作。通俗来说就是,当前线程在某个条件下需要等待,不需要使用太多系统资源。在某个条件下我们需要去唤醒它,分配给它一定的系统资源,让它继续工作。这样能更好的节约资源。

一、Object的wait()与notify()

  基本概念:

    一个线程因执行目标动作的条件未能满足而被要求暂停就是wait,而一个线程满足执行目标动作的条件之后唤醒被暂停的线程就是notify。

  基本模板:

synchronized (obj){
            //保护条件不成立
            while(flag){
                //暂停当前线程
                obj.wait();
            }
            //当保护条件成立,即跳出while循环执行目标动作
            doAction();
        }

  解析wait():Object.wait()的作用是使执行线程被暂停,该执行线程生命周期就变更为WAITING,这里注意一下,是无限等待,直到有notify()方法通知该线程唤醒。Object.wait(long timeout)的作用是使执行线程超过一定时间没有被唤醒就自动唤醒,也就是超时等待。Object.wait(long timeout,int naous)是更加精准的控制时间的方法,可以控制到毫微秒。这里需要注意的是wait()会在当前线程拥有锁的时候才能执行该方法并且释放当前线程拥有的锁,从而让该线程进入等待状态,其他线程来尝试获取当前锁。也就是需要申请锁与释放锁。

  解析notify():Object.notify()方法是唤醒调用了wait()的线程,只唤醒最多一个。如果有多个线程,不一定能唤醒我们所想要的线程。Object.notifyAll()唤醒所有等待的线程。notify方法一定是通知线程先获取到了锁才能进行通知。通知之后当前的通知线程需要释放锁,然后由等待线程来获取。所以涉及到了一个申请锁与释放锁的步骤。

  wait()与notify()之间存在的三大问题:

  从上面的解析可以看出,notify()是无指向性的唤醒,notifyAll()是无偏差唤醒。所以会产生下面三个问题

  过早唤醒:假设当前有三组等待(w1,w2,w3)与通知(n1,n2,n3)线程同步在对象obj上,w1,w2的判断唤醒条件相同,由线程n1更新条件并唤醒,w3的判断唤醒条件不同,由n2,n3更新条件并唤醒,这时如果n1执行了唤醒,那么不能执行notify,因为需要叫醒两条线程,只能用notifyAll(),可是用了之后w3的条件未能满足就被叫醒,就需要一直占用资源的去等待执行。

  信号丢失:这个问题主要是程序员编程出现了问题,并不是内部实现机制出现的问题。编程时如果在该使用notifyAll()的地方使用notify()那么只能唤醒一个线程,从而使其他应该唤醒的线程未能唤醒,这就是信号丢失。如果等待线程在执行wait()方法前没有先判断保护条件是否成立,就会出现通知线程在该等待线程进入临界区之前就已经更新了相关共享变量,并且执行了notify()方法,但是由于wait()还未能执行,且没有设置共享变量的判断,所以会执行wait()方法,导致线程一直处于等待状态,丢失了一个信号。

  欺骗性唤醒:等待线程并不是一定有notify()/notifyAll()才能被唤醒,虽然出现的概率特别低,但是操作系统是允许这种情况发生的。

  上下文切换问题:首先wait()至少会导致线程对相应对象内部锁的申请与释放。notify()/notifyAll()时需要持有相应的对象内部锁并且也会释放该锁,会出现上下文切换问题其实就是从RUNNABLE状态变为非RUNNABLE状态会出现

  针对问题的解决方案:

  信号丢失与欺骗性唤醒问题:都可以使用while循环来避免,也就是上面的模板中写的那样。

  上下文切换问题:在保证程序正确性的情况下使用notify()代替notifyAll(),notify不会导致过早唤醒,所以减少了上下文的切换。并且使用了notify之后应该尽快释放相应内部锁,从而让wait()能够更快的申请到锁。

  过早唤醒:使用java.util.concurrent.locks.Condition中的await与signal。

  PS:由于Object中的wait与notify使用的是native方法,即C++编写,这里不做源码解析。

二、Condition中的await()与signal()

  这个方法相应的改变了上面所说的无指向性的问题,每个Condition内部都会维护一个队列,从而让我们对线程之间的操作更加灵活。下面通过分析源码让我们了解一下内部机制。Condition是个接口,真正的实现是AbstractQueuedSynchronizer中的内部类ConditionObject。

  基本属性:

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

  从基本属性中可看出维护的是双端队列。

  await()方法解析:

public class ConditionObject implements Condition, java.io.Serializable {
  public final void await() throws InterruptedException {
   // 1. 判断线程是否中断
    if(Thread.interrupted()){
        throw new InterruptedException();
    }
   // 2. 将线程封装成一个 Node 放到 Condition Queue 里面
    Node node = addConditionWaiter();
   // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)              
    int savedState = fullyRelease(node);
    int interruptMode = 0;
   // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 
   //(1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
while(!isOnSyncQueue(node)){      // 5. 当前线程没在 Sync Queue 里面, 则进行 block LockSupport.park(this);      // 6. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移;
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){      // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面 break; } }    // 7. 调用 acquireQueued在 Sync Queue 里面进行独占锁的获取, 返回值表明在获取的过程中有没有被中断过 if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ interruptMode = REINTERRUPT; }    // 8. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal。
   //因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
if(node.nextWaiter != null){     // 9. 进行 cancelled 节点的清除 unlinkCancelledWaiters(); }    // 10. "interruptMode != 0" 代表通过中断的方式唤醒线程 if(interruptMode != 0){      // 11. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下 reportInterruptAfterWait(interruptMode); }   } }

  上面源代码可看出Condition内部维护的队列是一个等待队列,当需要调用signal()方法时就会让当前线程节点从Condition queue转到Sync queue队列中去竞争锁从而唤醒。

  signal()源码解析:

public class ConditionObject implements Condition, java.io.Serializable {
    public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    private void doSignal(Node first) {
            do {
                //传入的链表下一个节点为空,则尾节点置空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //当前节点的下一个节点为空
                first.nextWaiter = null;
                //如果成功将node从condition queue转换到sync queue,则退出循环,节点为空了也退出循环。否则就接着在队列中找寻节点进行唤醒
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
} 

  signal()会使等待队列中的一个任意线程被唤醒,signalAll()则是唤醒该队列中的所有线程。这样通过不同队列维护不同线程,就可以达到指向性的功能。可以消除由过早唤醒带来的资源损耗。注意的是在使用signal()方法前需要获取锁,即lock(),而后需要尽快unlock(),这样可以避免上下文切换的损耗。

总结:

  面向对象的世界中,一个类往往需要借助其他的类来一起完成计算,同样线程的世界也是,多个线程可以同时完成一个任务,通过唤醒与等待,能更好的操作线程,从而让线程在需要使用资源的时候分配资源给它,而不使用资源的时候就可以将资源让给其他线程操作。关于Condition中提到的Sync queue可参考Java并发——结合CountDownLatch源码、Semaphore源码及ReentrantLock源码来看AQS原理来看内部维护的队列是如何获取锁的。

01-22 06:01