JUC 源码学习 CountDownLatch 源码学习
1.CountDownLatch
构造方法
ountDownLatch countDownLatch = new CountDownLatch(n);
public CountDownLatch(int count) {
//如果构造参数少于0,抛出异常
if (count < 0) throw new IllegalArgumentException("count < 0");
调用Sync的构造方法
this.sync = new Sync(count);
}
//AQS的子类
private static final class Sync extends AbstractQueuedSynchronizer {
//调用父类的set方法,
Sync(int count) {
setState(count);
}
}
state是一个volatile类型的变量,保证了对它的读写操作的可见性,以及禁止了指令重排序,从而确保了线程之间对state的操作是正确的和可靠的。
//设置同步状态state的值
private volatile int state;
protected final void setState(int newState) {
state = newState;
}
await()方法
countDownLatch.await()
acquireSharedInterruptibly 是AQS提供的一个方法,用于以可中断的方式获取共享锁
sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果当前线程被中断,抛出响应中断异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
CountDownLatch重写了父类的tryAcquireShared 方法 。如果当前同步状态state的值 不为0 则返回 -1 ,否则返回 1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
当未调用countDown()方法时,CountDownLatch 初始化后的state值通常不等于0;
调用AQS.doAcquireSharedInterruptibly 方法,尝试可中断获取共享锁
doAcquireSharedInterruptibly(arg);
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//创建一个新的节点 放在队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前节点
final Node p = node.predecessor();
//如果前一个节点为头节点
if (p == head) {
//尝试获取线程资源。成功返回1 失败返回-1
int r = tryAcquireShared(arg);
//肯定返回-1
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//当获取线程资源失败后挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
//只有当前一个方法返回true,才会执行此方法
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AbstractQueuedSynchronizer.NODE是Java中AbstractQueuedSynchronizer(AQS)类中定义的一个枚举类型。它表示等待队列中的节点的状态。
AbstractQueuedSynchronizer.NODE的枚举值有以下几个意思:
- CANCELLED(已取消):表示节点已被取消。当一个线程等待的操作被取消或中断时,对应的节点的状态会被设置为CANCELLED。
- SIGNAL(信号):表示节点正在等待某个条件的发生。当一个节点处于等待状态时,它的状态会被设置为SIGNAL。
- CONDITION(条件):表示节点在条件队列中。当一个线程通过Condition对象的await方法进入等待状态时,对应的节点会被移动到条件队列中,并将其状态设置为CONDITION。
- PROPAGATE(传播):表示节点需要将唤醒信号传播给后继节点。在某些情况下,一个节点需要将唤醒信号传递给后继节点,以便唤醒更多的等待线程。
这些枚举值用于表示等待队列中节点的不同状态,通过判断节点的状态,可以进行相应的处理和操作,实现线程的同步和协作。在具体的使用场景中,可以根据节点的状态来决定是否需要阻塞线程、唤醒线程或传播唤醒信号。
循环执行 新节点的线程状态state为0,将在第一次循环时设置为 Node.SIGNAL(-1),进入等待信号状态,
第二次循环时将进入第一个判断,返回true 则进入逻辑与的第二个方法 parkAndCheckInterrupt()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前一个节点的状态为等待唤醒,那么当前节点肯定需要挂起,返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//当前线程可以挂起
return true;
//如果前节点的线程等待状态已经被取消,则需要从等待队列(双向链表)中剔除
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//尝试将前一个节点的线程等待状态修改为 等待信号
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//当前线程不能挂起
return false;
}
挂起当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
countDown() 方法
sync.releaseShared(1);
调AQS.releaseShared 方法
public final boolean releaseShared(int arg) {
//释放线程资源 state - arg
if (tryReleaseShared(arg)) {
//
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//cas 尝试将当前线程状态state的值减一
if (compareAndSetState(c, nextc))
//如果 = 0 ,表示可以获取其他线程可以获取线程资源了
return nextc == 0;
}
}
如果state - 1 (每次调用countDown()将减去 一 )
private void doReleaseShared() {
for (;;) {
//处理头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果头节点处于等待信号状态
if (ws == Node.SIGNAL) {
//反复尝试修改,释放线程资源 将线程状态state值设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//修改线程状态成功后next节点线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//头节点状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取next节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒
LockSupport.unpark(s.thread);
}