大家都用过ReentrantLock,但是大家对内部实现是否足够了解呢,下面我就简单说一下其中的实现原理。

  ReentrantLock是可重入锁,也就是同一个线程可以多次获取锁,每获取一次就会进行一次计数,解锁的时候就会递减这个计数,直到计数变为0。

  它有两种实现,一种是公平锁,一种是非公平锁,那么默认是什么锁呢?看完如下代码想必你也知道了。  

/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

  它的内部结构的实现是如何的呢? 首先NonFairSync类是静态内部类,它继承了Sync。

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync

  Sync继承了AbstractQueuedSynchronizer,简称AQS。同时Sync里边实现了tryRelease方法,因为公平锁和非公平锁都可以用这个方法释放锁。

/**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer 

  继续看非公平锁的lock方法,采用CAS进行当前状态的设置state=0,表示没有线程占用,state=1表示已经有现成占用了,设置成功了,将当前线程设置为线程拥有者,并且是排他的。如果有现成占用了,那么需要进入acquire(1),需要获取一个锁。

/**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

  acquire方法首先进行tryAcquire,尝试获取锁,即调用nonfairTryAcquire,判断当前锁是否state=0, 则没有现成占用,则进行设置。如果被占用了判断该线程是否是当前线程占用的,如果是的话,那么可以进行重入,即当前可以获取锁,计数器进行加1。否则的话返回失败。返回失败后执行addWaiter方法,也就是添加到等待的队列。Node是一个双向列表,也就是把需要等待的线程放到放到Node,并且链接起来。

            /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  接下来看一下Node的大致内容,有两个指针,一个是prev,一个是next,还保存着当前的线程。同时里边还有一个共享锁和独占锁,SHARED和EXCLUSIVE。ReentrantLock采用的就是独占锁。Semaphore,CountDownLatch等采用的是共享锁,即有多个线程可以同时获取锁。

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

  大致的思路我们看了一下,总体的流程图我画了一下。ReentrankLock内核采用的是AQS实现的,AQS里边采用的是双向链表,即如果当前线程未获取到锁将会加入到链表中。

聊聊ReentrantLock的内部实现-LMLPHP

  那么公平锁和非公平锁的实现的不同点在哪里呢?公平锁和非公平锁就差在 !hasQueuedPredecessors() ,也就是前边没有排队者的话,我就可以获取锁了。

/**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  如果当前线程之前还有线程等待就会返回true,如果当前节点是头结点,或者当前队列为空就会返回false。非公平锁没有这句话的判断,所以直接去竞争锁。

   /**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     *
     * <p>An invocation of this method is equivalent to (but may be
     * more efficient than):
     *  <pre> {@code
     * getFirstQueuedThread() != Thread.currentThread() &&
     * hasQueuedThreads()}</pre>
     *
     * <p>Note that because cancellations due to interrupts and
     * timeouts may occur at any time, a {@code true} return does not
     * guarantee that some other thread will acquire before the current
     * thread.  Likewise, it is possible for another thread to win a
     * race to enqueue after this method has returned {@code false},
     * due to the queue being empty.
     *
     * <p>This method is designed to be used by a fair synchronizer to
     * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
     * Such a synchronizer's {@link #tryAcquire} method should return
     * {@code false}, and its {@link #tryAcquireShared} method should
     * return a negative value, if this method returns {@code true}
     * (unless this is a reentrant acquire).  For example, the {@code
     * tryAcquire} method for a fair, reentrant, exclusive mode
     * synchronizer might look like this:
     *
     *  <pre> {@code
     * protected boolean tryAcquire(int arg) {
     *   if (isHeldExclusively()) {
     *     // A reentrant acquire; increment hold count
     *     return true;
     *   } else if (hasQueuedPredecessors()) {
     *     return false;
     *   } else {
     *     // try to acquire normally
     *   }
     * }}</pre>
     *
     * @return {@code true} if there is a queued thread preceding the
     *         current thread, and {@code false} if the current thread
     *         is at the head of the queue or the queue is empty
     * @since 1.7
     */
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

  这个就是ReentrantLock的基本原理,接下来咱们继续看看与之一块使用的Condition。Condition是一个接口,它的实现类是ConditionObject。调用await的时候也会将当前线程的一些信息加入到队列当中。ConditionObject中有一个firstWaiter和LastWaiter分别指向的了等待队列的头和尾。

聊聊ReentrantLock的内部实现-LMLPHP

  当调用Condition的signal方法是,则会将第一个Node转换到同步队列,如下图所示。聊聊ReentrantLock的内部实现-LMLPHP

  好了,总结一下:

  1. ReentrankLock默认是非公平锁。

  2.ReentrankLock的内部实现采用的AQS的双向链表实现。获取锁的线程会被封装成Node里边,供后续使用。

  3.公平锁采用判断当前Node是不是头结点,如果是的话就获取锁并做业务处理,不是头结点的不能获取所。

  4.非公平锁没有判断当前结点,采用CAS,谁第一个拿到了state=0,则视为获取锁。

  5.Condition的await和notify也采用类似的机制,当执行await是,会将当前线程信息的相关信息放入到Node的列表,记录firstWaiter和lastWaiter指向的信息。

  

  希望对大家有所帮助,如果有问题的请及时指出。

  

03-11 03:00