如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出 RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果你的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
:此策略将丢弃最早的未处理的任务请求。举个例子:Spring 通过 ThreadPoolTaskExecutor
或者我们直接通过 ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler
拒绝策略来配置线程池的时候,默认使用的是 AbortPolicy
。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor
将抛出 RejectedExecutionException
异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicy
。CallerRunsPolicy
和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 直接主线程执行,而不是线程池中的线程执行
r.run();
}
}
}
根据上面对线程池拒绝策略的介绍,相信大家很容易能够得出答案是:CallerRunsPolicy
。
这里我们再来结合CallerRunsPolicy
的源码来看看:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//只要当前程序没有关闭,就用执行execute方法的线程执行该任务
if (!e.isShutdown()) {
r.run();
}
}
}
从源码可以看出,只要当前程序不关闭就会使用执行execute
方法的线程执行该任务。
我们上面也提到了:如果想要保证任何一个任务请求都要被执行的话,那选择 CallerRunsPolicy
拒绝策略更合适一些。
不过,如果走到CallerRunsPolicy
的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。
这里简单举一个例子,该线程池限定了最大线程数为 2,阻塞队列大小为 1(这意味着第 4 个任务就会走到拒绝策略),ThreadUtil
为 Hutool 提供的工具类:
public class ThreadPoolTest {
private static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
public static void main(String[] args) {
// 创建一个线程池,核心线程数为1,最大线程数为2
// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
// 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
// 提交第一个任务,由核心线程执行
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第一个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第二个任务,由于核心线程被占用,任务将进入队列等待
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理入队的第二个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理第三个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
threadPoolExecutor.execute(() -> {
log.info("主线程处理第四个任务");
ThreadUtil.sleep(2, TimeUnit.MINUTES);
});
// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第五个任务");
});
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
输出:
18:19:48.203 INFO [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
18:19:48.203 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
18:19:48.203 INFO [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
18:20:48.212 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
18:21:48.219 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务
从输出结果可以看出,因为CallerRunsPolicy
这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致 OOM。
我们从问题的本质入手,调用者采用CallerRunsPolicy
是希望所有的任务都能够被执行,暂时无法处理的任务又被保存在阻塞队列BlockingQueue
中。这样的话,在内存允许的情况下,我们可以增加阻塞队列BlockingQueue
的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。
为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize
(最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue
的任务过多导致内存用完。
如果服务器资源以达到可利用的极限,这就意味我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死的本质就是因为我们不希望任何一个任务被丢弃。换个思路,有没有办法既能保证任务不被丢弃且在服务器有余力时及时处理呢?
这里提供的一种任务持久化的思路,这里所谓的任务持久化,包括但不限于:
这里以方案一为例,简单介绍一下实现逻辑:
RejectedExecutionHandler
接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。BlockingQueue
实现一个混合式阻塞队列,该队列包含 JDK 自带的ArrayBlockingQueue
。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()
方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ArrayBlockingQueue
中去取任务。整个实现逻辑还是比较简单的,核心在于自定义拒绝策略和阻塞队列。如此一来,一旦我们的线程池中线程以达到满载时,我们就可以通过拒绝策略将最新任务持久化到 MySQL 数据库中,等到线程池有了有余力处理所有任务时,让其优先处理数据库中的任务以避免"饥饿"问题。
当然,对于这个问题,我们也可以参考其他主流框架的做法,以 Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控:
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
//创建一个临时线程处理任务
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
ActiveMQ 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
//限时阻塞等待,实现尽可能交付
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});