Java线程池——Executor框架
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
Executor框架的组成:
- 任务(Runnable/Callable):任务通过Runnable接口或Callable接口进行定义。Runnable接口或Callable接口实现类都可以被 ThreadPoolExecutor执行
- 任务的执行(Executor):任务执行机制的核心接口 Executor,以及继承自Executor接口的ExecutorService接口。ThreadPoolExecutor实现了ExecutorService接口
- 异步的计算结果(Future):Future接口以及Future接口的实现类FutureTask类都可以代表异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutor执行后就会返回一个Future对象
一、Executor接口
线程池简化了线程的管理工作, 并且JUC提供了一种灵活的线程池实现来作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread而是Executor,Executor只定义了execute一个方法,是最顶层的接口。
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
execute方法接受一个Runnable参数,这个方法定义为在未来的某个时间执行传入的方法,方法的运行可以在一个新的线程,在线程池或者在调用的线程中,该方法无法接收到线程的执行结果(当然Runnable接口本身也没有返回值)。
虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
**Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。**如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式就是使用Executor。
二、ExecutorService接口
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它的run方法执行任务后不能返回一个值或者抛出一个受检查的异常。许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象,它认为主入口点(call)将返回一个值,并可能抛出一个异常。
因此引入了ExecutorService,它继承自Executor,并且在其基础上增加了很多功能,可以生成用于跟踪一个或多个异步任务进度的Future的方法,以解决Executor的局限性。
Future表示一个任务的生命周期,并且提供了响应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
在Future规范中包含的隐含意义是任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成状态。
get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已完成那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了一场,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。
定义了以下方法:
-
void shutdown()
:启动有序关机,其中执行先前提交的任务,但不接受新任务。如果调用已经关闭,则没有额外的效果。不会阻塞等待先前提交的任务执行完成 -
List<Runnable> shutdownNow()
:尝试停止所有正在执行的任务,停止对等待任务的处理,并返回等待执行的任务列表。不会阻塞等待正在执行的任务终止- 只是尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,因此任何未能响应中断的任务可能永远不会终止
-
boolean isShutdown()
:如果此执行器已关闭(调用了关闭方法),则返回true -
boolean isTerminated()
:如果关闭后所有任务都已完成,则返回true。注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真 -
boolean awaitTermination(long timeout, TimeUnit unit)
:阻塞直到所有任务在关机请求后完成执行,或者超时发生,或者当前线程被中断,以先发生的为准 -
<T> Future<T> submit(Callablet task)
:提交一个带返回值的任务以供执行,并返回表示该任务的挂起结果的Future。Future的get方法将在成功完成任务时返回任务的结果 -
<T> Future<T> submit(Runnable task, T result)
:提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法将在成功完成时返回给定的结果 -
Future<?> submit(Runnable task)
:提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法将在成功完成时返回null -
<T> Listfuture<t> invokeAll(Collection? extends Callable<T> tasks)
:执行给定的任务,并在所有任务完成时返回保存其状态和结果的future列表。每个Future对象的isDone方法都会返回true。请注意,已完成的任务可以正常终止,也可以抛出异常终止。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的 -
<T> T invokeAny(Collection? extends Callable<T> tasks)
:执行给定的任务,如果有成功完成的任务,则返回成功完成的任务的结果(即不抛出异常)。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。
三、ThreadPoolExecutor类
ThreadPoolExecutor实现了ExecutorService(实际上是继承了AbstractExecutorService),为了在广泛的上下文中发挥作用,该类提供了许多可调参数和可扩展性挂钩:
- corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
- maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
- keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程(救急线程)会在多长时间内被销毁
- unit:keepAliveTime的单位
- workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种
- threadFactory:线程工厂,用于创建线程,一般用默认即可
- handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务
1、状态
- RUNNING(-1<<29):接受新任务并且处理排队任务
- SHUTDOWN(0<<29):不接受新任务但是处理排队任务
- STOP(1<<29):不接受新任务也不处理排队任务,同时中断处理中的任务
- TYDING(2<<29):所有任务被终止,工作线程数为0之后进入该状态,并且会执行
terminated()
钩子函数 - TERMINATED(3<<29):
terminated()
钩子函数执行完毕后
状态单调地随时间增加,但不需要达到每个状态。
- RUNNING->SHUTDOWN:调用shutdown()
- RUNNING/SHUTDOWN->STOP:调用shutdownNow()
- STOP->TYDING:当队列和池都为空时
- TIDYING -> TERMINATED:terminated()钩子函数执行完毕后
ThreadPoolExecutor中对于状态的记录保存在一个AtomicInteger类型的变量中,其中高三位就是用于记录线程池的状态,而低的29位用于记录线程数量。
2、Worker
ThreadPoolExecutor中定义了一个私有静态类Worker,其继承自AbstractQueuedSynchronizer类,并实现了Runnable接口。其中维护了线程实例(Thread)、任务实例(Runnable)、线程任务计数器(long)变量。
这个类适当地扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。这可以防止中断,这些中断旨在唤醒等待任务的工作线程,而不是中断正在运行的任务。我们实现了一个简单的不可重入互斥锁,而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获得锁。此外,为了在线程实际开始运行任务之前抑制中断,我们将锁状态初始化为负值,并在启动时(在runWorker中)清除它。
3、扩展
该类还定义了三个protected类型的钩子函数:
- beforeExecute:线程池任务运行前执行
- afterExecute:线程池任务运行后执行
- terminated:线程池退出后执行
这几个方法在ThreadPoolExecutor中为空实现。
四、ForkJoinPool类
Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork就是把一个大任务切分为若干个子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。
1、工作窃取算法
ForkJoinPool是运行ForkJoinTasks的ExecutorService。**ForkJoinPool与其他类型的ExecutorService的区别主要在于使用了工作窃取。工作窃取算法是指某个线程从其他队列里窃取任务来执行。**那么为什么要使用工作窃取算法呢?**假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不干扰的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。**比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而这时它们会访问同一个队列,所以为了减少窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
2、Fork/Join的设计
想要设计一个Fork/Join框架,需要完成两个步骤:
- 分割任务:需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小
- 执行任务并合并结果:分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情:
-
ForkJoinTask(抽象类):我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction(抽象类):用于没有返回结果的任务
- RecursiveTask(抽象类):用于有返回结果的任务
-
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个线程。
public class CountTask extends RecursiveTask<Integer> {
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
}
RecursiveTask需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小就必须分割成两个子任务,每个子任务在调用fork方法时又会进入compute方法。最后使用join方法等待子任务执行完成并得到其结果。
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法来获取异常。
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException,如果任务没有完成或者没有抛出异常则返回null。
3、执行原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。
pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后再调用ForkJoinPool的signalWork反复噶唤醒或创建一个工作线程来执行任务。
五、ScheduledThreadPool类
ScheduledThreadPoolExecutor主要用来在给定的延迟后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor使用任务队列DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask的time变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask的squenceNumber变量小的先执行)。
1、ScheduledExecutorService
ScheduledThreadPool类继承自ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。
ScheduledExecutorService接口定义了几个方法:
- ScheduleFuture<?> schedule(Runnable command, long delay, TimeUnit unit):提交在给定延迟后启用的一次性任务
- ScheduleFuture schedule(Callable command, long delay, TimeUnit unit):提交在给定延迟之后启用的带返回值的一次性任务
- ScheduleFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):提交一个周期性任务,任务开始时进行计时(如果任务执行时间过长甚至超过period时间,会导致任务连续执行)
- ScheduleFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):提交一个周期性任务,任务执行完成之后才进行计时
ScheduledFuture继承自Delayed接口和Future接口,自己本身没有定义新的方法。
Delayed接口是一个混合风格的接口,用于标记应该在给定延迟后执行的对象。这个接口定义了一个getDelay方法,用于返回剩余的延迟时间。此外这个接口继承了Comparable接口,意味着此接口的实现必须定义一个compareTo方法,该方法提供与其getDelay方法一致的排序
2、比较Timer
- Timer对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是
- Timer只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程
- 在TimerTask中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机,即计划任务将不再运行。ScheduledThreadExecutor不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行
六、Executors类
Executors是Java中用于创建线程池的工厂类,它提供了一系列的静态工厂方法,用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性,使得线程池的创建变得非常简单。Executors工厂类提供的线程池有以下几种类型:
- newCachedThreadPool():CachedThreadPool的corePoolSize 被设置为0,maximumPoolSize被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新的线程。极端情况下,这样会导致耗尽 cpu和内存资源
- newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中包含指定数量的线程。线程数量是固定的,不会自动扩展,即没有救急线程
- newSingleThreadExecutor():创建一个单线程的线程池。这个线程池中只包含一个线程,用于串行执行任务。适用于需要按顺序执行任务的场景
- newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,用于定时执行任务。线程数量固定,不会自动扩展。适用于定时执行任务的场景
- newSingleThreadScheduledExecutor():创建一个单线程的定时执行线程池。只包含一个线程,用于串行定时执行任务
- newWorkStealingPool(int parallelism):该线程池维护足够的线程以支持给定的并行级别,并且可以使用多个队列来减少争用。并行性级别对应于积极参与或可用参与任务处理的最大线程数。实际的线程数可以动态地增加和减少。工作窃取池不能保证所提交任务的执行顺序
除此之外还提供了创建ThreadFactory实例,将Runnable实例转换为Callable实例等方法。