读写锁简介

  对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了。

  读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁,描述如下:

   读写锁的三个重要特性:

    ①. 公平选择权:支持非公平(默认)和公平的锁获取方式,非公平锁吞吐量由于公平锁。

    ②. 重进入:读锁和写锁都支持线程重进入。

    ③. 锁降级:遵循获取写锁、获取读锁、释放写锁的次序,写锁能够降级成为读锁。

源码解读

ReentrantReadWriteLock类的整体结构:

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

    // 读锁 
    private final ReentrantReadWriteLock.ReadLock readerLock;

    // 写锁 
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    // 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock 
    public ReentrantReadWriteLock() {
        this(false);
    }

    // 使用给定的公平策略创建一个新的 ReentrantReadWriteLock 
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    // 返回用于写入操作的锁 
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

    // 返回用于读取操作的锁
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
   // 继承AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {}
   // 非公平锁
    static final class NonfairSync extends Sync {}
   // 公平锁
    static final class FairSync extends Sync {}
   // 读锁
    public static class ReadLock implements Lock, java.io.Serializable {}
   // 写锁
    public static class WriteLock implements Lock, java.io.Serializable {}
}

类的继承关系

  public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}

  ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化。

类的内部类

ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。

说明:如上图所示,Sync继承AQS、NonfairSync和FairSync继承自Sync类;ReadLock和WriteLock实现了Lock接口。


 Sync类

(1)类的继承关系

  Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持

(2)类的构造器

Sync() {
    // 本地线程计数器 readHolds
= new ThreadLocalHoldCounter();
    // 设置AQS的状态 setState(getState());
// 确保readhold的可见性 }

 (3)类的属性

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 版本序列号
    private static final long serialVersionUID = 6317671515068378041L;
    // 高16位为读锁,低16位为写锁
    static final int SHARED_SHIFT   = 16;
    // 读锁单位。SHARED_SHIFT * 2
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 读锁最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 写锁最大数量
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 本地线程计数器
    private transient ThreadLocalHoldCounter readHolds;
    // 缓存的计数器
    private transient HoldCounter cachedHoldCounter;
    // 第一个读线程
    private transient Thread firstReader = null;
    // 第一个读线程的计数
    private transient int firstReaderHoldCount;
}

 (4)内部类

// 计数器
static
final class HoldCounter {
     // 计数
int count = 0; // Use id, not reference, to avoid garbage retention
     // 获取当前线程的TID属性的值 final long tid = getThreadId(Thread.currentThread()); }
// 本地线程计数器
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    // 重写初始化方法,在没有进行set的情况,获取的都是该HoldCounter值
public HoldCounter initialValue() { return new HoldCounter(); } }

 HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。

ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。

(5)类的方法

// 返回读锁线程数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回写锁线程数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
说明:直接将state右移16位,就可以得到读锁的线程数量,因为state的高16位表示读锁,对应的第十六位表示写锁数量。

 写锁的获取

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread(); //当前线程
            int c = getState();  //获取状态
            int w = exclusiveCount(c);  //写线程数量
        // 当同步状态state != 0,则已有线程获取读锁或写锁
            if (c != 0) {
                // 如果写锁状态为0,说明读锁此时被占用 则返回false;如果写锁状态不为0,且写锁没有被当前线程持有 则返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT) //判断同一线程获取写锁是否超过最大次数(65535),也算可重入
                    throw new Error("Maximum lock count exceeded");
                // 更新状态
                setState(c + acquires);
                return true;
            }
         // 到这里说明c=0,读/写锁都没有被获取。 判断是否正在阻塞 或 CAS更新状态失败
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);  //设置锁为当前线程所有
            return true;
        }

 获取写锁的步骤如下:

方法流程图:


 写锁的释放

protected final boolean tryRelease(int releases) {
        // 锁不是当前线程持有者
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
        // 写锁的新线程数。如果重入了几次,就要执行几次释放
            int nextc = getState() - releases;
        // 如果写(独占)模式重入数为0了,说明独占模式被释放
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null); //写锁释放完成,设置锁的持有者为null
            setState(nextc);  //更新重入数
            return free;
        }

写锁释放过程:

  1. 首先查看当前线程是否为写锁的持有者,如果不是抛出异常。

  2. 然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。

说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。

其方法流程图如下。


读锁的获取

// 从名称可见,读锁为共享锁,可被多个线程持有
protected final int tryAcquireShared(int unused) {
            // 当前线程
            Thread current = Thread.currentThread();
        // 获取状态
            int c = getState();
        // 如果写锁线程数 !=0,且独占锁不是当前线程 返回false。 因为存在锁降级
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
                return -1;
        // 读锁数量
            int r = sharedCount(c);
         // 读锁是否被阻塞 && 线程数小于最大值 && CAS设置成功
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
          // r == 0, 表示第一个读锁线程,首个读锁firstRead不会加入到readHolds中
                if (r == 0) {  //读锁数量0
                    firstReader = current; //设置第一个线程
                    firstReaderHoldCount = 1;  //读锁占用资源数为1
                } else if (firstReader == current) {  //当前线程为第一个读线程,即线程重入
                    firstReaderHoldCount++; //占用资源数加1
                } else { //读锁数量不为0,且不是当前线程
             // 获取计数器
                    HoldCounter rh = cachedHoldCounter;
             //计数器为空 || 计数器tid不是当前线程的tid
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();  //获取当前线程的计数器
                    else if (rh.count == 0)  //计数为0
                        readHolds.set(rh);  //加入readHolds中
                    rh.count++;
                }
                return 1;
            }
         //三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。
            return fullTryAcquireShared(current);
        }

fullTryAcquireShared()方法

final int fullTryAcquireShared(Thread current) {
            //
            HoldCounter rh = null;
            for (;;) {
                int c = getState(); //获取状态
          //写线程数不为0
                if (exclusiveCount(c) != 0) {
             // 不为当前线程 
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) { // 写线程数量为0,且读线程被阻塞
                    // 当前线程是第一个度线程
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else { //当前线程不是第一个读线程
                        if (rh == null) { //计数器为空
                            rh = cachedHoldCounter;
                  // 计数器为空 或者 计数器的tid不为当前正在运行的线程的tid
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)  // 读锁数为最大值,异常
                    throw new Error("Maximum lock count exceeded");
          // CAS成功
                if (compareAndSetState(c, c + SHARED_UNIT)) {
             // 读数量为0
                    if (sharedCount(c) == 0) {
                        firstReader = current; // 设置第一个读线程
                        firstReaderHoldCount = 1; //读线程占用资源数
                    } else if (firstReader == current) { // 读线程重入
                        firstReaderHoldCount++;
                    } else { //读锁数量不为0,并且不为当前线程
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) // 计数器为空 || 计数器的tid不为当前线程的tid
                            rh = readHolds.get();  //获取当前线程的计数器
                        else if (rh.count == 0)  //计数为0
                            readHolds.set(rh);  //加入到readHolds中
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

读写锁取锁过程:

  1. 首先判断写锁是否为0,,且当前线程不占有独占锁(写锁),之间返回;

  2. 否则,判断读线程是否被阻塞 && 读锁数小于最大值 && CAS成功,若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;

  3. 若当前线程线程为第一个读线程,则增加firstReaderHoldCount;

  4. 否则,将设置当前线程对应的HoldCounter对象的值。

流程图:


读锁的释放

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread(); //当前线程
            if (firstReader == current) { // 当前线程是否为第一个读线程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)  //读线程占用资源数为1
                    firstReader = null;
                else  //减少占用的资源
                    firstReaderHoldCount--;
            } else {  // 当前线程不是第一个读线程
          // 获取缓存的计数器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))  //计数器为空 || 计数器的tid不为当前正在运行的线程的tid
                    rh = readHolds.get(); // 获取当前线程的计数器
                int count = rh.count;  // 获取计数
                if (count <= 1) {  // 计数小于等于1
                    readHolds.remove(); //移除
                    if (count <= 0) //计数小于等于0,异常
                        throw unmatchedUnlockException();
                }
                --rh.count;  // 减少计数
            }
            for (;;) {
                int c = getState();  // 获取状态
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

读锁释放过程:

  1.  首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空。

    否则,将第一个读线程占有的资源数firstReaderHoldCount减1;

  2.  若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器.

       如果计数器的计数count小于等于1,则移除当前线程对应的计数器;如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。

流程图


对于非公平/公平内部类、读/写锁内部类的方法,大多都会转到调用Sync内部类的方法。

除内部类的方法外的其他方法,都是一些基本信息:读线程数、写线程数、是否被加读/写锁等等,不是很难 结合源码自行查看。


图解重要函数及对象关系

AQS图解

 读写锁的加锁解锁操作

从图中可见操作最终都是调用ReentrantReadWriteLock类的内部类Sync提供的方法。


AQS无锁状态


 AQS写锁无等待状态                                      AQS写锁重入状态

  

 AQS写锁等待状态


 AQS读锁无等待状态(首节点)                                AQS读锁重入状态(首节点)

  

AQS读锁无等待状态(非首节点)                                AQS读锁等待状态(非首节点)

  

读锁获取添加等待队列                                    写锁获取添加等待队列

  

参考:https://segmentfault.com/a/1190000015768003

12-21 20:05