1.3 线程池-addWorker
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取当前状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果线程数量大于最大值
//或者 如果创建核心线程的情况下,线程数大于核心线程
//或者 创建非核心线程的情况下 线程数大于了最大线程数
//直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//调用了原子类的CAS,如果成功 退出循环 原子类 会在后面章节介绍
if (compareAndIncrementWorkerCount(c))
break retry;
//如果失败了 重新获取一下c的值
c = ctl.get(); // Re-read ctl
// 状态发生了变化 从头开始 ,否则就是走for(;;)逻辑
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//走到这里 才是真正的创建逻辑
//走到这里 肯定会创建一个线程
//如果线程满了 走不到这
//所以workers的数量就是线程的数量
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//new了一个Worker ,Worker是个啥呢 看下面
//简单来说 就是包装了一个线程和Runnable任务的 任务类,负责真正干活的类
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//这里有一个Java重入锁 也会在后面介绍 现在知道他就是个锁就行了 起到并发安全的作用
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加到队列里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//添加了线程 就启动一下
//因为Worker是个runnable 所以start线程就自动执行run方法了
//run方法逻辑 看下面
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker 是个啥
是一个Runnable
又继承了AQS(这个会再重入锁那一章节讲)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
run
既然是个runnable run方法就很重要了
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//先获取当初创建worker时传入的那个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果STOP了 执行interrupt逻辑
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//上面返回任务为null会走到这里 下一章介绍这个方法
processWorkerExit(w, completedAbruptly);
}
}
//找到一个可以执行的任务 主要是这个逻辑
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果SHUTDOWN了 任务数量-1 返回 如果一直是这种状态 就一直减 减到0
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//
int wc = workerCountOf(c);
// Are workers subject to culling?
//allowCoreThreadTimeOut 默认是false
//也就是只有核心线程的情况会 timed是false 取任务
//存在非核心情况下,会在限定keepAliveTime的时间内获取任务,如果没有就返回空
//返回空之后就会走processWorkerExit 这个逻辑
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//从队列里获取一个任务
try {
Runnable r = timed ?
//队列也会在后面章节介绍
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//移除并返回队列头部的元素 如果队列为空,则阻塞
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
compareAndIncrementWorkerCount
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}