JUC实现AQS树:

AQS源码分析-LMLPHP

简介

AQS(AbstractQueuedSynchronizer)抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架;juc包中许多类基于该类实现的,比如:ReentrantLock、CountDownLatch等。
在整个AQS存在两种链表。一个链表就是整个Sync Node链表,横向链表。另一种链表就是Condition的Wait Node链表,相对于Sync node,它属于node节点的一个纵向链表。当纵向列表被single通知后,会进入对应的Sync Node进行排队处理

volatile关键字

  • 原子性(不一定能保证其原子性,如i++)

  • 可见性

  • 顺序性

预留方法介绍

以下5个protected方法,用于client自行实现,进行业务行为控制;juc包下Lock、Futrue很多实现都是基于此扩展的

  • tryAcquire(int):独占方式,尝试获取资源,成功则返回true,失败则返回false

  • tryRelease(int):独占方式,尝试释放资源,成功则返回true,失败则返回false

  • tryAcquireShared(int):共享方式,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源

  • tryReleaseShared(int):共享方式,尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它

独占锁、共享锁案例

  • 以ReentrantLock独占锁为例,state初始化为0,表示未锁定状态。线程Alock时,会调用tryAcquire独占该锁并cas设置state为1。此时其他线程来tryAcquire时会失败,直到线程A调用unlock释放锁将state设置为0为止。当然,在释放锁前,线程A自己可以重复获取此锁的(state累加),这就是可重入概念。但要注意获取多少次就对应需要释放多少次,这样才能保证state回到0状态

  • 以CountDownLatch共享锁为例,任务分为N个子线程执行,state也为N(state要保证和线程个数一致)。这N个子线程是并行执行的,每个子线程执行countDown方法时,state减1再cas设置state,等待所有子线程执行完后(state=0),会unpark()唤醒等待主线程,执行后续操作

Node设计

 static final class Node {
        // 共享锁模式,多个线程可同时执行,如Semaphore/CountDownLatch
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        // 独占锁模式,只有一个线程能执行,如ReentrantLock
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        // waitStatus状态
        // 表示当前节点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        // 代表是需要当前Node节点需要唤起后一个Node节点。在Node节点enqueue时,会设置前一个节点的状态。这样链式的唤醒,完成这样的一个交接棒
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL = -1;
        // 表示节点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取同步锁
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        // 共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继结点(传播)
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        // 状态,新节点默认为0
        volatile int waitStatus;
        // 链表前继节点
        volatile Node prev;
        // 链表后继节点
        volatile Node next;
        // 排队此节点的线程,初始化构造为null
        volatile Thread thread;
        // 一个标志位,就是用于表明是采用的共享锁还是排他锁。同时也是其对应condition队列的引用节点
        Node nextWaiter;
}

Node设计有以下几点需要注意:

  • Node实现了CLH锁;即Craig, Landin, and Hagersten (CLH) locks。CLH锁是一个自旋锁。能确保无饥饿性,提供先来先服务的公平性

  • 是一个FIFO的链表的实现,对于队列的控制经常要做double-check

  • waitStatus负值代表节点处于等待状态,正值代表节点已被取消

入队enqueue分析(double-check)

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            // 将当前node节点的prev指向tail节点
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // 位置1
                // tail节点的下一个节点指向当前node节点
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize //位置3
                if (compareAndSetHead(new Node())) // 位置4
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // 位置2
                    t.next = node;
                    return t;
                }
            }
        }
    }

以上代码标记的4个位置:

  1. node默认的enqueue操作都是接在tail节点之后,在prev节点指定完成之后,会进行一个cas操作,将当前加入的节点作为tail。因为会有并发操作,原先的tail节点会有所变化,位置1处cas出现失败。这样就进入第2步check

  2. 位置2,每次都获取一次当前的tail节点,尝试进行cas操作,将当前node节点作为tail,有并发/竞争导致处理失败,继续重复这一动作,直至cas成功

  3. 在位置1和位置2处理的一种异常情况,就是tail节点为空,有可能当前node节点是第一个进行enqueue,位置4这时需要创建一个新的队列,cas设置一个空的Head Node节点,并将自己作为tail节点。另一种异常情况也有可能是node节点被所有唤醒

  4. 同样考虑并发因素,位置3在处理时,有可能被其他线程已创建好Head Node节点,这样又回到位置2上处理,将node节点添加到tail节点之后

出队dequeue

出库分为两个动作:

1. 动作

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // 位置1
                if (p == head && tryAcquire(arg)) { 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) 
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) { // 位置2
        head = node;
        node.thread = null;
        node.prev = null;
    }

以上代码标记的2个位置:

  • 在位置1处,每次获取当前node节点的prev前个节点,判断是否等于head节点。这里head节点比较抽象,有点难理解,可以将head节点理解成一个“虚拟”或者“傀儡”节点,就纯粹代表上一个出库的节点。因为是一个FIFO队列,如果当前node节点的上一个节点已经出库,那就可以轮到自己

  • 在位置2处,轮自己出库,将当前node节点作为head,同时进行gc处理,手动断开node节点和链表的一些关联

2. 动作2

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) { // 位置1
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

释放锁的时候会调用该方法;位置1处,循环跳过已被取消的节点,最后unpark唤醒对应节点的线程

源码分析

**独占锁**

  • require

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // 位置1
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 位置2
                    parkAndCheckInterrupt()) //位置3
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node); // 位置4
        }
    }

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 如果为SIGNAL状态,则需要park
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { // 处理被取消的node节点
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // cas设置上一个节点的waitStatus
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
  1. tryAcquire自定义扩展,一般是对value值就行控制,类似P/V原语控制
    P:申请一个空闲资源(把信号量减1),若成功,则退出;若失败,则该进程被阻塞
    V:释放一个被占用的资源(把信号量加1),如果发现有被阻塞的进程,则选择一个唤醒之

  2. addWaiter,入库将Node添加到链表中,上面入队有讲解

  3. acquireQueued分析:

  • 位置1处,就是对比一下我的上一个节点是否已经出队列,如果已经出队列,就认为当前轮到自己出队列,返回interrupted的标志

  • 位置2处,执行shouldParkAfterFailedAcquire,就是设置一下当前节点的上一个节点的waitStatus状态为SINGLE,让其在出队列的时候能唤醒自己进行处理

  • 在设置了上一个节点为SINGLE后,当前线程就可以进行park,转到阻塞状态,直到等到被唤醒。(唤醒条件有2个:前一个节点的唤醒和Thread.interupte事件)

  • 位置4处,就是一个cancel

  • acquireInterruptibly中断

   public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 位置1
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    /**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); // 位置2
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquire和acquireInterruptibly不同之处,就在于位置1判断线程是否中断,如果中断则抛出异常由调用方处理;位置2处直接抛出InterruptedException中断异常

  • tryAcquireNanos超时

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    /**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 计算最后期限时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 计算超时时间
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L) // 位置1
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold) // 位置2
                    LockSupport.parkNanos(this, nanosTimeout); // 位置3
                if (Thread.interrupted()) // 位置4
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireNanos支持设置超时时间,在指定时间未获取到锁就返回false

  • 位置1处,如果超时直接返回false

  • 位置2处,如果计算出的超时时间大于自旋锁超时阀值时,则位置3parkNanos阻塞超时时间

  • 位置4处,线程中断直接抛出中断异常

**共享锁**

  • acquireShared

   public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 位置1
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared(); // 位置2
        }
    }
  • 位置1处,设置队列头,并检查后继者是否可能在共享模式下等待,如果正在传播,则传播是否设置为传播> 0或PROPAGATE状态

  • 位置2处,释放锁,出队列的时候,同时唤醒下一个Node

ConditionObject

使用场景:生产者与消费者
整体讲下这个ConditionObject的实现,其实维护两个队列:

  • Condition队列:表示等待的队列,其waitStatus=Node.Condition,由firstWaiter和lastWaiter两个属性操控

  • Sync队列:表示可以竞争锁的队列,这个跟AQS一致,waitStatus=0

  • await()方法:就是把当前线程创建一个Node加入Condition队列,接着就一直循环查其在不在Sync队列,如果当前节点在Sync队列里了,就可以竞争锁,恢复运行了

  • signal()方法:就是把某个节点的nextWaiter设为null,再把其从Condition队列转到Sync队列 

09-27 20:02