JUC 源码学习 CountDownLatch 源码学习

1.CountDownLatch

构造方法

ountDownLatch countDownLatch = new CountDownLatch(n);
  public CountDownLatch(int count) {
  			//如果构造参数少于0,抛出异常
        if (count < 0) throw new IllegalArgumentException("count < 0");
        调用Sync的构造方法
        this.sync = new Sync(count);
    }
//AQS的子类
private static final class Sync extends AbstractQueuedSynchronizer {
       	//调用父类的set方法,
        Sync(int count) {
            setState(count);
        }
}

state是一个volatile类型的变量,保证了对它的读写操作的可见性,以及禁止了指令重排序,从而确保了线程之间对state的操作是正确的和可靠的。

 	//设置同步状态state的值 
 	private volatile int state;
 
   protected final void setState(int newState) {
        state = newState;
    }

JUC 源码学习 CountDownLatch 源码学习-LMLPHPawait()方法

countDownLatch.await()

acquireSharedInterruptibly 是AQS提供的一个方法,用于以可中断的方式获取共享锁

sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果当前线程被中断,抛出响应中断异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取共享锁
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

CountDownLatch重写了父类的tryAcquireShared 方法 。如果当前同步状态state的值 不为0 则返回 -1 ,否则返回 1

  protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

当未调用countDown()方法时,CountDownLatch 初始化后的state值通常不等于0;

调用AQS.doAcquireSharedInterruptibly 方法,尝试可中断获取共享锁

 doAcquireSharedInterruptibly(arg);
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //创建一个新的节点 放在队尾
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            		//获取前节点
                final Node p = node.predecessor();
                //如果前一个节点为头节点
                if (p == head) {
                		//尝试获取线程资源。成功返回1 失败返回-1
                    int r = tryAcquireShared(arg);
                    //肯定返回-1
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //当获取线程资源失败后挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                		//只有当前一个方法返回true,才会执行此方法
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

AbstractQueuedSynchronizer.NODE是Java中AbstractQueuedSynchronizer(AQS)类中定义的一个枚举类型。它表示等待队列中的节点的状态。
AbstractQueuedSynchronizer.NODE的枚举值有以下几个意思:

  1. CANCELLED(已取消):表示节点已被取消。当一个线程等待的操作被取消或中断时,对应的节点的状态会被设置为CANCELLED。
  2. SIGNAL(信号):表示节点正在等待某个条件的发生。当一个节点处于等待状态时,它的状态会被设置为SIGNAL。
  3. CONDITION(条件):表示节点在条件队列中。当一个线程通过Condition对象的await方法进入等待状态时,对应的节点会被移动到条件队列中,并将其状态设置为CONDITION。
  4. PROPAGATE(传播):表示节点需要将唤醒信号传播给后继节点。在某些情况下,一个节点需要将唤醒信号传递给后继节点,以便唤醒更多的等待线程。
    这些枚举值用于表示等待队列中节点的不同状态,通过判断节点的状态,可以进行相应的处理和操作,实现线程的同步和协作。在具体的使用场景中,可以根据节点的状态来决定是否需要阻塞线程、唤醒线程或传播唤醒信号。

循环执行 新节点的线程状态state为0,将在第一次循环时设置为 Node.SIGNAL(-1),进入等待信号状态,

第二次循环时将进入第一个判断,返回true 则进入逻辑与的第二个方法 parkAndCheckInterrupt()

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前一个节点的状态为等待唤醒,那么当前节点肯定需要挂起,返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             //当前线程可以挂起
            return true;
        //如果前节点的线程等待状态已经被取消,则需要从等待队列(双向链表)中剔除
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
           	//尝试将前一个节点的线程等待状态修改为 等待信号
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //当前线程不能挂起
        return false;
    }

挂起当前线程

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

JUC 源码学习 CountDownLatch 源码学习-LMLPHP

countDown() 方法

 sync.releaseShared(1);

调AQS.releaseShared 方法

  public final boolean releaseShared(int arg) {
  			//释放线程资源  state - arg
        if (tryReleaseShared(arg)) {
        		//
            doReleaseShared();
            return true;
        }
        return false;
    }
      protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                //cas 尝试将当前线程状态state的值减一
                if (compareAndSetState(c, nextc))
                		//如果 = 0 ,表示可以获取其他线程可以获取线程资源了
                    return nextc == 0;
            }
        }

如果state - 1 (每次调用countDown()将减去 一 )

 private void doReleaseShared() {
        for (;;) {
        		//处理头节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果头节点处于等待信号状态
                if (ws == Node.SIGNAL) {
                		//反复尝试修改,释放线程资源 将线程状态state值设置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //修改线程状态成功后next节点线程
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //头节点状态为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //获取next节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        		//唤醒
            LockSupport.unpark(s.thread);
    }

JUC 源码学习 CountDownLatch 源码学习-LMLPHP

07-04 04:07