并发编程AQS源码分析
并发编程AQS源码分析
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。它是一个Java提高的底层同步工具类,比如CountDownLatch、ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的
简单来说:是用一个int类型的变量表示同步状态,并提供了一系列的CAS操作来管理这个同步状态对象
一个是 state(用于计数器,为0时释放锁)
一个是线程标记(当前线程是谁加锁的),
一个是阻塞队列Node(用于存放其他未拿到锁的线程)
例子:线程A调用了lock()方法,通过CAS将state赋值为1,然后将该锁标记为线程A加锁。如果线程A还未释放锁时,线程B来请求,会查询锁标记的状态,因为当前的锁标记为 线程A,线程B未能匹配上,所以线程B会加入阻塞队列,直到线程A触发了 unlock() 方法,这时线程B才有机会去拿到锁,但是不一定肯定拿到
源码分析阶段
直接看ReentrantLock中的lock方法加锁是如何使用到AQS的、ReentrantLock分为公平锁和非公平锁、默认是非公平锁
公平锁:按照队列先进先出的顺序进行加锁解锁
非公平锁:哪个线程先抢到锁就是哪个的、有可能造成某一个线程一直抢不到锁
非公平锁
// 非公平锁
final void lock() {
// 进行cas操作、state值为0则赋值为1、成功获取锁
if (compareAndSetState(0, 1))
// 设置线程标记、线程标记用来检测是否是重入锁
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用AQS中的acquire方法、在调用ReentrantLock类下定义的Sync类中的nonfairTryAcquire方法进行具体的锁操作细节
// 传参1、里面一直进行for循环CAS赋值、直到哪个线程抢到返回
acquire(1);
}
// acquire方法是操作state状态位的方法、通过tryAcquire里面的CAS原子操作检测锁状态
public final void acquire(int arg) {
// tryAcquire调用nonfairTryAcquire方法
if (!tryAcquire(arg) &&
// addWaiter: 根据给定模式创建一个当前线程的Node节点并返回、当前是创建一个独占锁的Node节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断当前线程
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
// 先获取当前线程标识符、用于检测当前对象的线程标识符是否是同一个、同一个则为可重入锁、可直接返回、不是则返回后调用acquireQueued
final Thread current = Thread.currentThread();
// 获取计数器、表明当前线程重入了多少次锁
int c = getState();
// 为0则代表当前线程的锁全部解锁完毕
if (c == 0) {
// 给state计数器加1、代表上锁
if (compareAndSetState(0, acquires)) {
// 设置对象的线程标识符为当前线程
setExclusiveOwnerThread(current);
// 直接返回
return true;
}
}
// 代表当前有线程在使用该对象锁、检测是否是同一线程、是则进入、为可重入锁
else if (current == getExclusiveOwnerThread()) {
// 计算state
int nextc = c + acquires;
// 小于0则代表锁过多、即0x7fffffff + 1为负数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 赋值state
setState(nextc);
// 直接返回
return true;
}
// 返回false、表示当前对象锁被其他线程占用了、需要等到加锁的线程解锁才进行抢占
return false;
}
// 根据给定模式创建一个当前线程的Node节点并返回
private Node addWaiter(Node mode) {
// 根据当前线程创建一个Node节点、并传入模式(独占锁、共享锁)
Node node = new Node(Thread.currentThread(), mode);
// 获取队列尾部Node节点
Node pred = tail;
// 检测队列是否存在尾部节点、即是否存在节点
if (pred != null) {
// 新创建的Node节点上一个节点设为队列的尾部节点、然后下面直接将新创建节点插入队列尾部
node.prev = pred;
// 通过cas操作更换节点顺序、即新创建的节点为尾部、原先尾节点的下一个节点设置为当前新创建的Node节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 将新创建的Node节点插入队列尾部
enq(node);
return node;
}
// 里面for死循环无限调用nonfairTryAcquire方法检测锁是否被释放、然后进行抢占加锁
final boolean acquireQueued(final Node node, int arg) {
// 错误位、当finally执行时该变量为true、则代表有异常发生、取消当前的操作
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回Node节点的上一个节点
final Node p = node.predecessor();
// 检测是否是头节点、是的话在进行cas检测锁是否解锁在抢占
if (p == head && tryAcquire(arg)) {
// 抢占到锁了、设置当前线程的Node节点为等待队列的头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 检测当前线程是否调用了interrupt中断线程方法、并且检测状态位进行阻塞、调用LockSupport.park(this)方式阻塞自己
parkAndCheckInterrupt())
// 如果循环走到这里在return返回的话、代表线程将中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}