概述
ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。
读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
1)支持公平和非公平的获取锁的方式;
2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
4)读取锁和写入锁都支持锁获取期间的中断;
5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
使用
示例一:利用重入来执行升级缓存后的锁降级
class CachedData {
Object data;
volatile boolean cacheValid; //缓存是否有效
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock(); //获取读锁
//如果缓存无效,更新cache;否则直接使用data
if (!cacheValid) {
// Must release read lock before acquiring write lock
//获取写锁前须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
//锁降级,在释放写锁前获取读锁
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}
use(data);
rwl.readLock().unlock(); //释放读锁
}
}
示例二:使用 ReentrantReadWriteLock 来提高 Collection 的并发性
通常在 collection 数据很多,读线程访问多于写线程并且 entail 操作的开销高于同步开销时尝试这么做。
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock(); //读锁
private final Lock w = rwl.writeLock(); //写锁
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
实现原理
ReentrantReadWriteLock 也是基于AQS实现的,它的自定义同步器(继承AQS)需要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。
ReentrantReadWriteLock含有两把锁readerLock和writerLock,其中ReadLock和WriteLock都是内部类。
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
写锁的获取与释放(WriteLock)
写锁是一个可重入的独占锁,使用AQS提供的独占式获取同步状态的策略。
(一)获取写锁
//获取写锁
public void lock() {
sync.acquire(1);
}
//AQS实现的独占式获取同步状态方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//自定义重写的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); //取同步状态state的低16位,写同步状态
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//存在读锁或当前线程不是已获取写锁的线程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//判断同一线程获取写锁是否超过最大次数,支持可重入
if (w + exclusiveCount(acquires) > MAX_COUNT) //
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//此时c=0,读锁和写锁都没有被获取
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
从源代码可以看出,获取写锁的步骤如下:
1)判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行2);否则执行5)。
2)判断同步状态state的低16位(w)是否为0。如果w=0,说明其他线程获取了读锁,返回false;如果w!=0,说明其他线程获取了写锁,执行步骤3)。
3)判断获取了写锁是否是当前线程,若不是返回false,否则执行4);
4)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程以获取写锁,更新是线程安全的),返回true。
5)此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,否则返回false。如果需要阻塞则返回false。
writerShouldBlock() 表示当前线程是否应该被阻塞。NonfairSync和FairSync中有不同是实现。
//FairSync中需要判断是否有前驱节点,如果有则返回false,否则返回true。遵循FIFO
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
//NonfairSync中直接返回false,可插队。
final boolean writerShouldBlock() {
return false; // writers can always barge
}
(二)释放写锁
//写锁释放
public void unlock() {
sync.release(1);
}
//AQS提供独占式释放同步状态的方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//自定义重写的tryRelease方法
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; //同步状态减去releases
//判断同步状态的低16位(写同步状态)是否为0,如果为0则返回true,否则返回false.
//因为支持可重入
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc); //以获取写锁,不需要其他同步措施,是线程安全的
return free;
}
读锁的获取与释放(ReadLock)
读锁是一个可重入的共享锁,采用AQS提供的共享式获取同步状态的策略。
(一)获取读锁
public void lock() {
sync.acquireShared(1);
}
//使用AQS提供的共享式获取同步状态的方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//自定义重写的tryAcquireShared方法,参数是unused,因为读锁的重入计数是内部维护的
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//exclusiveCount(c)取低16位写锁。存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c); //取高16位读锁,
//readerShouldBlock()用来判断当前线程是否应该被阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT && //MAX_COUNT为获取读锁的最大数量,为16位的最大值
compareAndSetState(c, c + SHARED_UNIT)) {
//firstReader是不会放到readHolds里的, 这样,在读锁只有一个的情况下,就避免了查找readHolds。
if (r == 0) { // 是 firstReader,计数不会放入 readHolds。
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //firstReader重入
firstReaderHoldCount++;
} else {
// 非 firstReader 读锁重入计数更新
HoldCounter rh = cachedHoldCounter; //读锁重入计数缓存,基于ThreadLocal实现
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//第一次获取读锁失败,有两种情况:
//1)没有写锁被占用时,尝试通过一次CAS去获取锁时,更新失败(说明有其他读锁在申请)
//2)当前线程占有写锁,并且有其他写锁在当前线程的下一个节点等待获取写锁,除非当前线程的下一个节点被取消,否则fullTryAcquireShared也获取不到读锁
return fullTryAcquireShared(current);
}
从源代码可以看出,获取读锁的大致步骤如下:
1)通过同步状态低16位判断,如果存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败;否则执行步骤2)。
2)通过readerShouldBlock判断当前线程是否应该被阻塞,如果不应该阻塞则尝试CAS同步状态;否则执行3)。
3)第一次获取读锁失败,通过fullTryAcquireShared再次尝试获取读锁。
readerShouldBlock方法用来判断当前线程是否应该被阻塞,NonfairSync和FairSync中有不同是实现。
//FairSync中需要判断是否有前驱节点,如果有则返回false,否则返回true。遵循FIFO
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
//当head节点不为null且head节点的下一个节点s不为null且s是独占模式(写线程)且s的线程不为null时,返回true。
//目的是不应该让写锁始终等待。作为一个启发式方法用于避免可能的写线程饥饿,这只是一种概率性的作用,因为如果有一个等待的写线程在其他尚未从队列中出队的读线程后面等待,那么新的读线程将不会被阻塞。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
fullTryAcquireShared方法
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
//如果当前线程不是写锁的持有者,直接返回-1,结束尝试获取读锁,需要排队去申请读锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//如果需要阻塞,说明除了当前线程持有写锁外,还有其他线程已经排队在申请写锁,故,即使申请读锁的线程已经持有写锁(写锁内部再次申请读锁,俗称锁降级)还是会失败,因为有其他线程也在申请写锁,此时,只能结束本次申请读锁的请求,转而去排队,否则,将造成死锁。
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
//如果当前线程是第一个获取了写锁,那其他线程无法申请写锁
// assert firstReaderHoldCount > 0;
} else {
//从readHolds中移除当前线程的持有数,然后返回-1,然后去排队获取读锁。
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
//示成功获取读锁,后续就是更新readHolds等内部变量,
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
(二)释放读锁
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//更新计数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//自旋CAS,减去1<<16
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
参考资料
(java并发锁ReentrantReadWriteLock读写锁源码分析)http://blog.csdn.net/prestigeding/article/details/53286756
《Java并发编程的艺术》
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。