ReentrantReadWriteLock源码解析

NonfairSync ReentrantReadWriteLock

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; //非公平实现,总是返回false 不会阻塞
    }
    final boolean readerShouldBlock() {
         //队列的head后面的节点不为空且为非共享的就返回true 就阻塞
        return apparentlyFirstQueuedIsExclusive();
    }
}

FairSync ReentrantReadWriteLock

//查看队列是否有先驱
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

ReentrantReadWriteLock的同步实现Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
   private static final long serialVersionUID = 6317671515068378041L;

   /*使用一个32位的int类型来表示被占用的线程数
    加锁的区域被分为两部分,低16位代表独占锁的被同一个线程申请的次数
    高位代表共享读锁占有的线程数量*/
   static final int SHARED_SHIFT   = 16;//读锁占用的位数
   static final int SHARED_UNIT    = (1  SHARED_SHIFT);增加一个读锁相当于增加一个SHARED_UNIT
   static final int MAX_COUNT      = (1  SHARED_SHIFT) - 1;//申请读锁的最大线程数量 65535
   static final int EXCLUSIVE_MASK = (1  SHARED_SHIFT) - 1;//计算写锁的具体值时,该值为15个1,

   //计算申请读锁的线程数,将c向右移位16就得到高16位的值  
   static int sharedCount(int c)    { return c  SHARED_SHIFT; }
   //计算申请写锁的数量,将c和16个1相与得到低16位
   static int exclusiveCount(int c) { return c  EXCLUSIVE_MASK; }

   //每个线程有多少读锁
   static final class HoldCounter {
       int count = 0;
       // 使用线程id来避免垃圾保留
       final long tid = getThreadId(Thread.currentThread());
   }

   /**
    * ThreadLocal subclass. Easiest to explicitly define for sake
    * of deserialization mechanics.
    */
   static final class ThreadLocalHoldCounter
       extends ThreadLocalHoldCounter {
       public HoldCounter initialValue() {
           return new HoldCounter();
       }
   }

   //当前线程的可重入读锁的数量,无论什么时候一个线程的readHolds变成0就释放锁
   private transient ThreadLocalHoldCounter readHolds;

   private transient HoldCounter cachedHoldCounter;

    //第一个获取读锁的线程
   private transient Thread firstReader = null;
   private transient int firstReaderHoldCount;

   Sync() {
       readHolds = new ThreadLocalHoldCounter();
       setState(getState()); // ensures visibility of readHolds
   }
   abstract boolean readerShouldBlock();
   abstract boolean writerShouldBlock();

   protected final boolean tryRelease(int releases) {
       if (!isHeldExclusively())
           throw new IllegalMonitorStateException();
       int nextc = getState() - releases;
       boolean free = exclusiveCount(nextc) == 0;//申请写锁的数量
       if (free)
           setExclusiveOwnerThread(null);
       setState(nextc);
       return free;
   }

   protected final boolean tryAcquire(int acquires) {
       /*
        * Walkthrough:
        * 1.读锁的数量非0或者写锁的数量非0,并且锁的拥有者是其他线程,获取失败
        * 2. 如果锁的数量饱和大于65535,失败
        * 3.
        */
       Thread current = Thread.currentThread();
       int c = getState();//
       int w = exclusiveCount(c);//申请写锁的数量
       if (c != 0) {
           // (Note: if c != 0 and w == 0 读锁数量不为0,独占锁获取失败)
           if (w == 0 || current != getExclusiveOwnerThread())//条件1
               return false;
           if (w + exclusiveCount(acquires)  MAX_COUNT)//条件2
               throw new Error("Maximum lock count exceeded");
           // 可重入
           setState(c + acquires);
           return true;
       }
       //c==0,说明当前没有锁
       if (writerShouldBlock() ||//这个实现在子类中,根据是否公平区分
           !compareAndSetState(c, c + acquires))
           return false;
       setExclusiveOwnerThread(current);
       return true;
   }
//体现firstReader和cachedHoldCounter作用,减少调用readHolds的get方法次数
   protected final boolean tryReleaseShared(int unused) {
       Thread current = Thread.currentThread();
       if (firstReader == current) {
           // assert firstReaderHoldCount  0;
           if (firstReaderHoldCount == 1)
               firstReader = null;
           else
               firstReaderHoldCount--;
       } else {
           HoldCounter rh = cachedHoldCounter;
           //判断当前线程是否是cachedHoldCounter,
           //不是就调用readHold的get方法获取
           if (rh == null || rh.tid != getThreadId(current))
               rh = readHolds.get();//从ThreadLocalMap中获取当前线程
           int count = rh.count;
           if (count = 1) {
               readHolds.remove();//ThreadLocalMap中移除当前线程
               if (count = 0)
                   throw unmatchedUnlockException();
           }
           --rh.count;//读锁数量减一
       }
       for (;;) {
           int c = getState();
           int nextc = c - SHARED_UNIT;
           if (compareAndSetState(c, nextc))
               // 释放读锁并且不会影响到其他读者
               //但是他可能允许等待的写者运行,如果读 写锁都是空闲的
               return nextc == 0;
       }
   }


   protected final int tryAcquireShared(int unused) {
       /*
        * 1. If write lock held by another thread, fail.如果写锁被其他线程占有,失败
        * 2. 
        */
       Thread current = Thread.currentThread();
       int c = getState();
       if (exclusiveCount(c) != 0 
           getExclusiveOwnerThread() != current)
           return -1;
       int r = sharedCount(c);
       if (!readerShouldBlock() 
           r  MAX_COUNT 
           compareAndSetState(c, c + SHARED_UNIT)) {
           if (r == 0) {
               firstReader = current;
               firstReaderHoldCount = 1;
           } else if (firstReader == current) {
               firstReaderHoldCount++;
           } else {
               HoldCounter rh = cachedHoldCounter;
               if (rh == null || rh.tid != getThreadId(current))
                   cachedHoldCounter = rh = readHolds.get();
               else if (rh.count == 0)
                   readHolds.set(rh);
               rh.count++;
           }
           return 1;
       }
       return fullTryAcquireShared(current);
   }
   /**
1.获取当前是否已经被锁住,如果c!=0 判断写锁的申请数量是否为0,是则转2
不是就判断是否是当前线程占用写锁,不是返回false是则转2
2.判断写锁的数量是否超过最大值,是则抛出错误,不是就转3
3.cas的设置c失败则返回false,否则设置当前线程独占锁,返回true
    */
   final boolean tryWriteLock() {
       Thread current = Thread.currentThread();
       int c = getState();
       if (c != 0) {
           int w = exclusiveCount(c);
           if (w == 0 || current != getExclusiveOwnerThread())
               return false;
           if (w == MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
       }
       if (!compareAndSetState(c, c + 1))
           return false;
       setExclusiveOwnerThread(current);
       return true;
   }

   /**
    1.写锁有没有被占用,没有被占用转2,被占用判断释放是当前线程占用
    如果是当前线程占用,转2,不是返回false
    2.判断读锁的最大数量有没有超过最大数量,有抛出错误,没有就转3
    3.cas设置state更新firstReader firstReaderHoldCount cachedHoldCounter readHolds
    */
   final boolean tryReadLock() {
       Thread current = Thread.currentThread();
       for (;;) {
           int c = getState();
           if (exclusiveCount(c) != 0 
               getExclusiveOwnerThread() != current)
               return false;
           int r = sharedCount(c);
           if (r == MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
           if (compareAndSetState(c, c + SHARED_UNIT)) {
               if (r == 0) {
                   firstReader = current;
                   firstReaderHoldCount = 1;
               } else if (firstReader == current) {
                   firstReaderHoldCount++;
               } else {
                   HoldCounter rh = cachedHoldCounter;
                   if (rh == null || rh.tid != getThreadId(current))
                       cachedHoldCounter = rh = readHolds.get();
                   else if (rh.count == 0)
                       readHolds.set(rh);
                   rh.count++;
               }
               return true;
           }
       }
   }
   final int getCount() { return getState(); }
}

小结:


  • 读锁是可以多个线程重入的,写锁只能是单个线程重入
  • 一个拥有写锁的线程可以拥有读锁,写锁降级为读锁
  • 同时占有读锁和写锁的线程释放了写锁就降为读锁,此时写操作是无法重入的
    如果写锁还未完全释放是可以重入的
  • 公平模式下读写锁的顺序必须按照AQS的队列顺序进行,非公平模式下要看
    head的下一个节点是否是独占的

最新回复(0)
/jishud_2F8GGLvNWHyvyOpDF88GUca3IOmKwOh8ZFZHXg_3D_3D4794838
8 简首页