ThreadPoolExecutor
ThreadPoolExecutor是Java线程工具类的基础,通过调整它的内置参数可以定制各种用途的线程池,
线程池主要解决两个问题:可以在执行大量异步任务的时候提供高性能,如果没有线程池,那么就会产生这样一种情况,一个线程执行一个任务,线程的生命周期中包括了创建和销毁阶段,大量的创建线程和销毁线程无疑占用了大量的CPU时间以及系统资源,有了线程池,可以让线程池管理线程,让每条线程服务更多的任务,合理的安排空闲线程去执行任务,提高了系统吞吐量,并节约了系统资源
ThreadPoolExecutor拥有很多参数可以设置,但是Java建议大家使用ThreadPoolExecutor的工厂类Executors去创建一个线程池,譬如:newCachedThreadPoll()可以创建一个根据需要创建新线程的线程池,对于执行许多短期异步任务的程序而言,这些线程池通常可以提高程序性能,。如果现有线程没有可用的,则创建一个线程并添加到池中,终止并从缓存中移除那些已经有60s没有被使用的线程。因此,长时间保持空闲的线程不会使用任何资源
它是一个无界线程池,在线程不工作一段时间后进行回收
Executors.newFixedThreadPool.创建一个固定大小的线程池,Executors.newSingleThreadExecutor 一个单独线程的线程池,
通过设置Core和maxinum大小,ThreadPoolExecutor可以自动调节线程池大小,
核心线程只有在任务来临时进行创建并启动,通过使用prestartCoreThread方法或者preStartAllCoreThreads方法,可以预先创建核心线程或者全部线程,这种情况发生在线程池有非空队列时,可以提高服务响应速度
创建一个新的线程
通过ThreadFactory可以创建新的线程,如果没有特殊指定,Executors的defaultThreadFactory会被使用,它创建的线程在同一线程组,线程拥有相同的普通优先级,它也不是守护线程。通过提供不同的线程工厂,可以改变线程名,线程组,线程优先级,守护状态等内容。
线程应该具有modifyThread 运行时权限,如果工作线程或者其他线程使用线程池的时候没有这个权限,服务可能会被降级,修改配置后,线程池不会立即生效这个改变,当关闭线程池的时候,可能会处于temination状态而不是处于completed状态
Keep-alive times
如果线程池当前拥有的线程超过核心线程数量,额外线程在空闲一段时间后将会被停止,这样可以使线程池在较为空闲的时候可以降低系统资源消耗。如果线程池之后变得更加活跃,那么新的线程也是可以被创建出来的,使用setKeepAliveTime(long,TimeUnit)方法可以动态改变这个线程在空闲状态下保持生存的时间。通过设置keepAliveTime到最大值,可以使线程池中空闲线程能够在线程池关闭之前一直保持生存状态,
默认情况下,keep-alive策略只有在线程池中线程数量超过核心线程数量的时候才会生效,
通过设置allowCoreThreadTimeOut(boolean)方法的参数为true,可以让keep-alive策略在线程池线程数不超过核心线程的时候生效。
队列
任何BlockingQueue接口下面的子类都可以用来保持并转移已经提交的任务,队列的使用与线程池当前的线程数量有关系:
1 如果当前线程池中线程数量少于核心线程数,Executor偏向于向线程池中添加新的线程而不是将提交的任务加入到队列
2 如果线程池中线程数量超过或者等于核心线程数量,那么Executor总是偏向于将新来的任务加入到队列中等待,而不是新建一个线程
3 如果新的任务不能入队列,一般情况下是因为队列满了,那么新的工作线程将会被创建出来,线程池线程数量要保证不能超过线程池的线程最大数量,如果线程池中的线程数量已经达到最大值,那么任务将会被拒绝
线程池队列常用的三种实现:
:SynchronousQueue:将任务传递给工作线程而不进行保存,入队会失败如果线程池中此时没有空闲的线程可以接受这个任务,一个新的线程将会被创建,这个策略在处理任务请求时不会去查看任务之间的依赖关系
直接传递法,通常需要没有限制的最大线程池大小来避免因任务过多而导致丢弃任务的现象,但是这样会使线程池中线程数量急剧增加,增加系统消耗
Unbounded queues
使用无界队列来存放任务,当线程池中的所有线程都有任务处理时,新来的任务将会先进队列等待被执行。
使用无界队列的好处是,当任务之间有前后依赖关系时,后面的任务不会插到它所依赖的任务的前面被执行,,任务之间的执行不会相互干扰。比如,在web服务器中,使用这个队列,可以起到缓冲突然激增的请求的作用。
Bounded queues.
有界队列的常用实现是ArrayBolockingQueue,当线程池的最大线程数量是无穷大时,它可以防止资源短缺的情况发生,但是它使线程池很难调优。
有界队列大小和线程池最大线程数大小通常是相对平衡的:
当使用大的有界队列,和小的线程池,会降低CPU使用率,降低OS资源使用。当使用小的有界队列,和大的线程池,将会降低系统吞吐量。
如果任务经常阻塞,系统会使用更多的线程。使用小的有界队列通常需要更大的线程池,这会使CPU更忙,但是会遇到线程池处理完所有工作线程,导致部分工作线程空闲,降低了吞吐量
拒绝任务
当Executor关闭时,新任务将会被拒绝,当使用有限大小的线程池和任务队列时,通过调用RejectExecutionHandler的rejectedExecution(Runnable,ThreadPoolExecutor)方法可以决定拒绝策略,有四种策略提供:
1 默认情况适用ThreadPoolExecutor.AbortPolicy,当队列满时,它会跑出一个运行时异常RejectedExecutionException
2 ThreadPoolExecutor.CallerRunsPolicy 调用这个方法的线程自己去执行这个任务,它提供了一个简单的回馈控制机制,可以减低新任务提交的速率
3 ThreadPoolExecutor.DiscardPolicy,新任务不会被执行直接丢弃
4 ThreadPoolExecutor.DiscardOldestPolicy,如果executor没有关闭,那么工作队列头部的任务将会被丢弃,如果丢弃后,工作队列满还有新来的工作请求,那么重复这一过程。
通过定义新的RejectExecutorHandler,可以根据具体场景处理工作请求。
Worker类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
```
Worker是一个内部类,主要用来维护线程运行任务时中断控制状态,这个类继承了AbstracQueueSynchronized,简化了在任务调度的过程中获取和释放锁的过程。使用非重入锁而不是重入锁,主要是为了在调用了线程池控制方法比如setCorePoolSize()后不能再获得锁,为了保证线程在实际开始运行任务之前不会被中断,将状态stage设置为-1。在runWorker方法中消除这个限制。
可以看到Worker类主要由一个Thread线程,以及一个Runnable接口组成。Thread代表线程池中的线程,Runnable接口则代表具体任务。
Woker类重要的方法就是runWorker(Worker w)方法,
这个方法是一个运行循环,不断重复的从队列中获取任务并执行任务,需要应对如下的问题:
1. 通常Worker带有一个初始任务,如果不要使用第一个任务,只要线程池还在运行,可以使用getTask方法获取任务。如果它返回null,worker会因为线程池状态改变或者配置参数变化而退出。其他退出的原因则包括外部代码抛出异常,通常会导致processWorkerExit来替换当前线程。
2. 在运行任务之前,需要获得锁资源来防止其他线程池中断任务的执行,我们可以保证除非线程池被关闭,当前线程都不会设置interrupt状态位。
3. 每个任务执行执行之前都会调用beforeExecute方法,这个方法可能会跑出一个异常,这样可能会导致线程没有执行完任务就死亡
4. 假设beforeExecute正常完成,运行了任务,收集所有异常发送到afterExecute,分开处理运行时异常,
5. 当任务执行完毕,调用afterExecute方法,它也可能会跑出异常,导致线程死亡,根据JLS,这个异常会一直生效即使task.run方法抛出了异常。
6. 通过线程的UncaughtExceptionHandler和afterExecute,可以为用户代码提供详细的异常信息。
Worker方法的源码解读:
```java
final void runWorker(Worker w) {
//外部程序调用了Worker的run方法后,会启动一个新的线程,这里是获取创建出来的线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
//是否因为异常而退出循环的标志位
boolean completedAbruptly = true;
try {
//只有当Worker的初始化任务为空,或者Worker从任务等待队列中获取
//任务获取不到时,Worker会退出循环,线程会走向死亡。
while (task != null || (task = getTask()) != null) {
//进行加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//线程运行前需要执行的操作,默认为空
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
//Worker完成任务加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行Worker清理工作
processWorkerExit(w, completedAbruptly);
}
}
整体的执行逻辑就是,Worker如果没有初始化任务,需要不断的从任务等待队列拿出任务出来
并执行,如果获取不到任务,就会跳出循环,线程执行完毕进行清理,线程走向消亡。
这段代码上有两个比较重要的方法,一个是getTask()方法,另外一个是processWorkerExit()
方法,这两个方法实现比较复杂。
首先来看getTask()方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//获取线程池的状态成员,这个变量是一个AtomicInteger类型,高三位用来
//表示线程池运行状态,低29位来表示线程数量
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果线程池状态大于SHUTDOWN并且工作等待队列是空的
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//线程池减少worker数量
decrementWorkerCount();
return null;
}
//worker
int wc = workerCountOf(c);
// Are workers subject to culling?
//检查线程池是否具备减员的资格,条件是,线程池是否设置了当线程池
//中线程数量不超过核心线程数对空闲线程进行清理,也就是
//allowCoreThreadTimeOut这个boolean变量,或者当前线程数量大于
//核心线程数,这两个条件二者满足一条都会触发线程池清理空闲线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//当且仅当线程池具有清退线程资格以及线程池中有空闲线程时,才能够
//让线程池减少线程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//当线程池中线程数大于核心线程数或者设置了允许核心线程也进行减员,Worker
//线程从工作等待队列中获取任务时,可以进行一段时间的等待,在这个等待的时间
//内获取任务都是可以获取到任务的,超过这个等待时间就会直接导致线程退出循环
//然后线程池减少自己的线程数。如果获取到了,直接返回这个任务。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask()方法主要逻辑就是从工作等待队列中获取任务,在获取任务之前,先进行线程池状态
判断,如果线程池处于STOP状态或者大于STOP状态并且工作队列为空,则减少线程池中线程数。
然后依据是否允许核心线程空闲超时,线程池线程数是否大于核心线程数,在获取工作队列中
的任务时,进行等待获取,在等待时间内获取到任务就可以正常获取任务。否则获取不到任务,
线程池会依据线程池状态清理多余空余线程。
其二就是processWorkerExit()方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);//从workers列表删除这个Worker记录
} finally {
mainLock.unlock();
}
//尝试进行线程池的状态转为terminated
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//如果没有因为异常而退出循环
if (!completedAbruptly) {
//检查是否允许核心线程超时,如果true则0,否则corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果工作队列不为空,最小值为0,则不允许线程池中不留一条线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//当线程池中的线程数大于最小值,则不需要替换这个线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果Worker因为异常而退出,或者通过上面的考验,则需要添加新的Worker。
addWorker(null, false);
}
}
可以看到Worker收尾工作任务,主要做了两个任务
1.对Worker完成的任务数量进行结算,加入到总体的完成任务中,从Worker列表中删除Worker
2.当Worker因为异常退出,或者线程池中线程数量小于CorePoolSize时,需要增加Worker数量。
再来看addWorker方法的实现。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//当线程池状态大于等于SHUTDOWN状态,也就是可能是SHUTDOWN,TIDYING,
//TERMINATED状态,但是要排除一种情况,这种情况是,线程池处于SHUTDOWN
//状态,初始任务为空并且工等待队列不为空的情况。则不会进行后续添加
//Worker的行动
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//成功增加了线程池的线程数量,则进行后续添加Worker行动
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//线程池工作状态不一致,继续循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker
w = new Worker(firstTask);
//获取Worker对应的Thread
final Thread t = w.thread;
//检查Worker对应线程是否为空,Worker对应的Thread是由ThreadFactory创建
//的,可以由用户指定,或者使用线程池默认的
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取线程池工作状态
int rs = runStateOf(ctl.get());
//如果线程池的状态是RUNNING,或者是SHUTDOWN但是没有初始任务
//则可以向线程池中添加新的Worker,否则不能添加
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动Worker的工作线程,处理工作任务。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//添加Worker失败的处理
addWorkerFailed(w);
}
return workerStarted;
}
上述添加的Worker的过程,主要分三块,添加Worker之前的检查,如果不能通过就会直接返回false
添加Worker,就是创建一个Worker对象,向线程池中将Worker的引用保存到Set集合中。
以及添加Worker失败后的失败处理。
addWorkerFailed方法非常简单
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
主要做了三件事情,从线程池中移除Worker引用,减少Worker数量,尝试将线程池状态转换为
Terminated状态。
至此线程池中Worker的代码已经全部结束了,现在来看线程池的主要方法
线程池ThreadPoolExecutor主要的功能是管理线程池,接受新的工作任务。
首先来看接受新的工作任务,所做的工作,对应的代码是execute()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上面的英文注释很清楚的解释了这段代码所做的事情,
- 当工作线程数小于核心线程数时,尝试添加Worker,添加成功直接返回,否则进行后续处理
- 当添加Worker失败后,先检查线程池是否在运行,再将任务添加到工作等待队列中,如果false
转向3,如果true,则再次检查线程池是RUNNING状态,如果不是并且之后从工作等待成功移除了
任务,那么拒绝当前任务。如果工作线程数为0,那么添加一个初始任务为空的Worker - 再次添加一个带有初始任务的Worker,如果添加失败则拒绝任务。
reject(command)是execute()方法中出现次数比较多的方法,这个方法的代码为:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
可以看到它调用了handler的rejectedExecution方法,这个方法有两个参数,一个是当前
任务,另外一个是this,代表当前线程池ThreadPoolExecutor实例。
前面提到,线程池的拒绝策略主要默认有4中策略,拒绝时抛出异常,拒绝时由主线程自己执行任务
拒绝时不做任何处理,拒绝时丢弃等待队列等待最长任务。
最后来看线程池ThreadPoolExecutor的管理方法,由于方法比较多,摘出常用关键的方法来看
首先是shutdown方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否具有关闭线程池权限
checkShutdownAccess();
//将线程池工作状态转换为SHUTDOWN状态
advanceRunState(SHUTDOWN);
//中断所有的等待线程
interruptIdleWorkers();
//专门为ScheduledThreadPoolExecutor预留的方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试转换为Terminated状态
tryTerminate();
}
处理过程相对比较简答,只是将线程池状态改为SHUTDOWN状态,并中断所有等待任务的Worker而已。
然后是shutdownNow方法,
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow方法和shutDown方法处理逻辑大致相同,不同的是状态转换,以及多了drainQueue
方法,
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
drainQueue方法主要是为了将任务等待队列中的任务全部移出队列,并返回任务列表
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
awaitTermination方法主要是在一定时间范围内等待线程池的状态是否为TERMINATED状态,如果
超过这段时间则返回false
至此线程池源码全部解读完毕,做一下总结
- ThreadPoolExecutor提供了7个主要的参数,分别是核心线程数、最大线程数、保持活跃时间
值、时间单位、工作等待嘟咧、线程工厂、拒绝策略处理器。 - ThreadPoolExecutor留下了许多待扩展的空方法,比如beforeExecute方法,这个方法在
Worker运行任务之前会被调用,在这个方法里面可以做重新初始化ThreadLocals,记录日志等功能。 - ThreadFactory类可以自定义,ThreadPoolExecutor默认使用的线程工厂DefaultThreadFactory
并不一定满足所有人需求,通过自定义DefaultThreadFactory实现,可以让线程池中生产的线程
能够满足特殊的需要,比如名称,线程组等。 - 理解线程池的4个状态非常重要,状态之间的各种转换,对于理解线程池中线程的运行状态有
很大帮助。 - 在不同场景合理的使用corePoolSize,maxPoolSize,maxKeepAliveTime参数,可以节约系统
资源消耗。 - 使用preStartCoreThread、prestartAllCoreThreads方法,可以不用等待任务的到来,就预先
创建一定数量的线程,可以提高线程池启动期间的响应速度。
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。