在学习了CAS、原子类、AQS、各种锁、并发队列之后我们开始学习线程池,既是对前面的巩固也是对技术更进一步探索,可以说大部分的框架都离不开线程池,
所以理解它对我们后期的学习有非常大的帮助
介绍
线程池主要解决两个问题:
- 当执行大量并发任务时,线程池能提供较好的性能,不需要每次使用new来创建线程对象,减少开销
- 线程池实现了对线程的管理和资源限制,以及一些统计数据
在concurrent包中我们可以使用Executors工具类创建线程池,工具包中有不同的实现,根据需要,返回不同的线程池实例。
ThreadPoolExecutor是其中最基础也是用的最多的一种,所以我们从它开始入手
类图构成
从类图中可以看到,它有一个原子变量ctl,它是用来记录线程池状态和线程个数的,有点类似读写锁中的state
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static final int COUNT_BITS = Integer.SIZE - 3; // 计数位为int类型位数-3(下面我们假设是32位),也就是低29位
- private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 容量是29个1
- private static final int RUNNING = -1 << COUNT_BITS; // 高三位11100000000000000000000000000000
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
- private static final int STOP = 1 << COUNT_BITS; // 001
- private static final int TIDYING = 2 << COUNT_BITS; // 010
- private static final int TERMINATED = 3 << COUNT_BITS; // 011如果看过读写锁部分,就应该觉得很熟悉的操作了,都是通过位运算来获取状态,其高三位代表状态:
1
2
3private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取高三位
private static int workerCountOf(int c) { return c & CAPACITY; } // 获取低29位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取ctl值 - RUNNING:处理阻塞队列里的任务,并且接受新任务
- SHUTDOWN:处理阻塞队列里的任务,但不接受新任务 调用shutdown()方法时
- STOP:中断任务,且不处理阻塞队列里的任务,并且不接受新任务 显式调用shutdownNow()
- TIDYING:表示在所有任务执行完后(包括阻塞队列)执行terminated方法,当线程池为空
- TERMINATED 终止状态3
其他核心参数:
- BlockingQueue
workQueue; 用于保存等待执行的任务的阻塞队列,也就是我们之前讲到过的那些并发队列 - ReentrantLock mainLock 独占锁,用来控制worder的原子性,比如新增worker
- HashSet
workers 线程池执行任务的对象,也就是真正工作的线程 - int corePoolSize; 核心线程个数
- int maximumPoolSize; 线程池最大线程数量
- RejectedExecutionHandler handler 饱和策略,也就是队列满了,并且线程个数达到最大值采取的处理方式,如抛出异常、丢弃但不抛出异常
- keepAliveTime 当线程数量超过了corePoolSize指定的线程数,并且空闲线程空闲的时间达到当前参数指定的时间时该线程就会被销毁,如果调用过allowCoreThreadTimeOut(boolean value)方法允许核心线程过期,那么该策略针对核心线程也是生效的
- threadFactory: 创建线程的工厂,如果未指定则使用默认的线程工厂
方法源码分析
execute
线程池使用的核心方法是execute,该方法的作用是提交任务到线程池中执行。 - execute 总结上面的调度策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//三种情况,第一种是当前线程个数小于corePollSize,开启新的新的线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))// 有可能线程数达到核心线程限制,则失败
return;
c = ctl.get();
}
// 第二种情况,如果状态是Running,则添加任务到阻塞队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次检查,如果不是Running状态,则执行删除并拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则如果当前线程为空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 如果是
}
// 第三种情况,如果队列吗,则新增线程,新增失败再执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
- 如果线程池中的线程数小于corePoolSize,那么每来一个任务都会创建一个新的线程
- 若当前执行的任务达到了corePoolSize指定的线程数时,也即所有的核心线程都在执行任务时,此时来的新任务会保存在workQueue指定的任务队列中;也就是第二种情况
- 当所有的核心线程都在执行任务,并且任务队列中存满了任务,此时若新来了任务,那么线程池将会创建新线程执行任务;也就是第三种情况
- 如果队列满了,并且线程数达到了maximumPoolSize,则reject ,也就是第三种情况但新增失败。(所以任务的最大数是queueSize+maximumPoolSize)
由上面代码可知,主要逻辑集中在addworker上
- addWorker(Runnable firstTask, boolean core) addWorker方法中由于没有做同步,因此有很多判断线程状态的逻辑,我们注意到一般判断都是以shutdown为分界线,shutdown以上自不必说,肯定不能再创建线程了,只要注意如果线程池为shutdown状态时如何处理就行,只有当入参firstTask为null,且队列不为空时才会增加工作线程,其他不增加。 那么重点就在于什么时候会入参为null呢?我们先往下看,再回来解答这个问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // 走到这就代表可以安全的创建线程了
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 主要分两部分,前半部分使用cas来更新工作线程数量,后半部分使用加锁来创建线程。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // work线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 增加工作线程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新已使用的最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker执行
要了解Worker这个类,我就可以从它的官方注释说起
- 为什么要继承AQS,也就是为什么要锁?
This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run.
翻译过来就是,为了中断正在等待任务的线程,而不是中断正在运行的线程。所以,实现锁,主要是为了运行中的线程不被中断(加锁就不会)。 - 为什么不使用ReentrantLock而是自己实现呢?
because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize.
翻译过来就是,不想让在调用线程池方法时,重入获取锁。(会调用interruptIdleWorkers,tryLock会重入,来中断线程)
构造函数
1
2
3
4
5Worker(Runnable firstTask) {
setState(-1); // 防止执行runWorker之前的中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}run 这个是现成的方法,实际里面执行的是runWorker(this)
1
2
3public void run() {
runWorker(this);
}结合上文中说到再addWorker成功后会启动线程,也就是执行这里的runWorker(this)方法。
runWorker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts state设置为0可以相应中断了。
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 到这说明还有任务,开始执行任务
w.lock(); // 防止被shutdown
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 执行前的钩子函数,默认是什么都不做
Throwable thrown = null;
try {
task.run(); // 实际执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 统计完成了多少个任务
w.unlock();
}
}
completedAbruptly = false; // 到这说明没有异常
} finally {
processWorkerExit(w, completedAbruptly);// 清理函数
}
}函数的作用就是不断的循环,去getTask取任务,如果有任务则执行任务,在没有任务后,线程会执行清理并退出函数
getTasK 获取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 到这说明没有任务了,则将工作线程worker减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果设置了超时时间,或者线程数超出核心线程数则为true。都使用超时时间
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 到这说明,线程数量已经足够多,或者已经超时,则返回null,并减少1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 是否使用超时,采用的获取方式不一样。
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}上面这个函数主要关注超时部分,两种情况,一种是核心线程需要超时,另一种就是线程数超过核心线程。
processWorkerExit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted,表示意外结束,则表示在run()发生了异常,则需要数量-1 ,如果不是,则不需要,因为在getTask()中-1了
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();// 释放线程,需要加锁。
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 从workers中移除
} finally {
mainLock.unlock();
}
tryTerminate();// 尝试终止线程池,因为有可能现在已经没有任务了
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // running状态或shutdown状态,则还要增加一个线程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 如果大于核心线程数则退出
}
// 也就是说小于核心线程数才会增加
addWorker(null, false);
}
}注意之前的问题,什么时候addWorker参数为null时,shutdown状态也是可以添加的,也就是要去完成队列里的任务。
关闭线程池
shutdown
调用shutdown方法后,线程池不再接受任务,但工作队列中的任务还是会被执行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();// 权限检查
advanceRunState(SHUTDOWN); // 设置状态
interruptIdleWorkers(); // 中断等待任务的线程(1)
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 可重入锁
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); // 中断线程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}1)这里可能会疑惑,怎么中断的?回到runWorker()函数中,我们发现能执行到这里说明拿到锁了,那么中断可能在
task != null || (task = getTask()) != nul
这条语句中。其实就是在getTask()中往getTask()函数下面看,找到,worQueue.poll或者workQueue.take();这两个函数都是获取任务队列中的下个任务,而且都是加锁且响应中断。所以我们中断的作用是是在这。
当然肯定还会问,只是结束这一次的循环获取任务而已,并不会中断啊。确实没错,它仅仅是结束这一次的获取。但是,如果这是队列处在SHUTDOWN并且队列为空时,那么就会退出,并且移除线程,从而达到中断清理线程的目的。那如果队列中还存在呢,这又回到我们之前说的,shutdown状态,仅仅是不再接受任务,但还是会处理队列中的任务。所以还是会继续获取任务,只有等任务处理完毕,才开始一个一个退出2)在shutdown执行完,仍有任务,队列不会退出,那最后是如何清理的呢。
答案是之前讲过的processWorkerExit(w, completedAbruptly); 在没有任务之后,线程会自动退出从workers中移除,然后执行tryTerminate()
tryTerminate()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); //中断workers中的一个
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 到这说明已经没有线程了 workerCountOf(c) == 0,则设置状态为TERMINATED
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); // 唤醒所有等待terminate的线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}shutdownNow
shutdownNow和shutdown函数不一样,它会结束所有的线程,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() { // 不是interruptIdleWorkers
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted(); // 无需获取锁,也就是执行中的任务也会被中断。
} finally {
mainLock.unlock();
}
}
总结
创建线程池
使用Executors工具类创建,根据最大线程数、核心线程数、超时空闲时间来创建。添加任务
- 如果线程数小于核心线程数,则直接添加线程
- 如果线程数达到核心线程数,则将任务添加到任务阻塞队列
- 如果线程数达到核心线程数,且任务队列满了,则创建新线程
- 如果线程数达到最大线程数,且任务队列满了,则拒绝任务
执行任务
- 在创建线程时获取任务或者循环从任务队列中获取任务,并执行
- 在线程数小于核心线程数时,会一直阻塞获取任务
回收线程
- 如果一直没有获取到线程且超过核心线程则退出
- 调用shutdown后,会中断等待任务的线程,如果没有任务,则全部退出。如果有任务,则核心线程数内的线程会继续去等待任务,所有任务完成后,退出