[转帖]我所知道的线程池

知道,线程 · 浏览次数 : 0

小编点评

**线程池概述** 线程池是执行定时任务的工具,它可以并行执行多个任务,节约线程资源。 **ScheduledThreadPoolExecutor** ScheduledThreadPoolExecutor 是一个线程池,它可以执行定时任务。它继承了ThreadPoolExecutor,可以设定corePoolSize、threadFactory、rejectedExecutionHandler等。 **主要构造函数** * **newSingleThreadScheduledExecutor():**设定核心线程数。 * **newScheduledThreadPool(1):**设定线程数。 * **newThreadPoolExecutor(int corePoolSize):**设置核心线程数。 * **newThreadPoolExecutor(int corePoolSize, int threadFactory, RejectedExecutionHandler rejectedExecutionHandler):**设置核心线程数、线程工厂、拒绝执行器。 **重要方法** * **schedule():**安排一个任务在指定时间内执行。 * **scheduleWithFixedDelay():**安排一个任务在指定时间内执行,并延迟执行。 * **scheduleAtFixedRate():**安排一个任务在指定时间内执行,并重复执行。 * **scheduleWithFixedDelay():**安排一个任务在指定时间内执行,并延迟执行。 **示例** ```java //创建线程池 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10); //安排任务 executor.schedule(new Runnable() { @Override public void run() { System.out.println("执行任务"); } }, 10, TimeUnit.SECONDS); //执行任务 executor.schedule(new Runnable() { @Override public void run() { System.out.println("执行任务"); } }, 20, TimeUnit.SECONDS); //关闭线程池 executor.shutdown(); ``` **注意** * 线池可以设置阻塞队列,但默认情况下不设置。 * 如果关闭线程池,则必须使用handler拒绝任务。 * 线池存在至少一个线程,即使关闭线程池,也能执行周期任务。

正文

https://bigbully.github.io/%E7%BA%BF%E7%A8%8B%E6%B1%A0

 

线程池其实或多或少都用过,不过这是我第一次阅读它的源码,包括源码附带的非常详尽的注释。发现我之前对于线程池的理解还是很浅薄的。

其实从ThreadPoolExecutor.java顶部200多行的注释就能一定程度上了解线程池的用法了。

首先看一下线程池的初始化方式:

  1. public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

参数说明 - corePoolSize 核心线程数:提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。 - maximumPoolSize 最大线程数:线程池允许存活的线程数的最大值。核心线程数不能超过最大线程数。 - keepAliveTime 非核心线程数的存活时间:如果核心线程数小于最大线程数,那么核心线程之外的那些线程当存活超过keepAliveTime,就会被终止。 - unit 存活时间的单位 - workQueue 线程池使用的队列 - threadFactory 线程工厂 - handler 当线程池拒绝提交新的任务时使用的策略。

默认构造方法的参数着实不少,一开始我根本无法理解为什么需要这么多初始化参数,不过在源码的注释中,针对不同的场景如何设置线程池参数的示例。

首先需要了解的是corePoolSize,maximumPoolSize,workQueue,handler四者之间的关系。

image

依照这个关系图,在源码注释中给出了3种推荐策略:

推荐策略

  • 直接的任务传递:最大线程数设为Integer.MAX_VALUE,workQueue使用SynchronousQueue,因为SynchronousQueue自身特性不保留任何元素,所以当核心线程都在使用时,任何提交的任务都会创建新的非核心线程。这种方式任务处理的吞吐量最大,不过会消耗更多地资源。
  • 无界队列:可以使用LinkedBlockingQueue这种无界队列,这时最大线程数这个参数不再有意义。当任务提交频率较平缓时,且所有提交的任务之间都彼此独立时使用这种策略。不过需要担心一下内存溢出的问题。
  • 有界队列:例如使用ArrayBlockingQueue,会根据队列的size和最大线程数来调整线程池对资源的使用和吞吐量的高低。

接下来从源码层面来研究一下线程池的实现。

image

ThreadPoolExecutor自身的层级结构如图所示。最上层的执行器Executor接口本身只具备void execute(Runnable command)的能力,而ExecutorService作为一个service又必须具备提交任务、正常关闭等职能,在AbstractExecutorService实现了一些公共方法。ThreadPoolExecutor作为线程池的基础实现,其下还有若干具备特殊功能的线程池继承自ThreadPoolExecutor。

不过这篇笔记只研究ThreadPoolExecutor的实现,其他不同类别的特殊线程池的学习放在之后进行。

ThreadPoolExecutor中也涉及到一些和AbstractQueuedSynchronizer相关的操作,AbstractQueuedSynchronizer及其各种实现类可以我在并发的灵魂中有详尽的解析,在此只简单描述它的功能。

作为一个Executor的实现类,自然要从execute方法说起:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {//如果工作线程数小于核心线程数
  6. if (addWorker(command, true))//创建worker,第二个参数为true表示创建核心线程
  7. return;//创建成功则返回
  8. c = ctl.get();//创建失败获取保存的ctl
  9. }
  10. if (isRunning(c) && workQueue.offer(command)) {//判断ctl中的状态,如果线程池处于RUNNING状态,则把任务放入队列中
  11. int recheck = ctl.get();//重新确认状态
  12. if (! isRunning(recheck) && remove(command))//如果线程池已关闭,则移除任务
  13. reject(command);//移除成功调用RejectedExecutionHandler
  14. else if (workerCountOf(recheck) == 0)//如果remove失败
  15. addWorker(null, false);//自旋检查是否Q中所有任务都已经处理完成
  16. }//如果线程池已经关闭或队列已满
  17. else if (!addWorker(command, false))//尝试创建worker,并使用非核心线程
  18. reject(command);//创建失败调用RejectedExecutionHandler
  19. }

这方法就涉及到ctl属性:

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

用一个支持并发的原子Int类型来保存两个属性:工作线程数和当前线程池状态。由于两个属性共享一个原子Int类型,所以线程池共支持(229)-1,大于5亿个线程。线程池的状态共分为一下五种:

  • RUNNING
  • SHUTDOWN 这个状态不再接收新的task,不过当前正在处理队列中的task
  • STOP 这个状态不再接受新的task,并不在处理队列中的task
  • TIDYING 这个状态所有的task都已经执行完毕,工作线程数降为0,不过正在执行terminated()方法
  • TERMINATED 这个状态表示terminated()方法已经执行完毕

以上五种状态从上至下一次发生状态改变,唯一的特例在于如果当前没有task可以处理,那么RUNNING状态会直接转变为STOP状态。

这个int类型共32位,保留最左3位用来保存5个状态值,右边29位保存工作线程数,所以以下几个方法(rs表示runState,wc表示workerCount)也就非常好理解了。

  1. private static final int COUNT_BITS = Integer.SIZE - 3;
  2. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  3. // runState is stored in the high-order bits
  4. private static final int RUNNING = -1 << COUNT_BITS;
  5. private static final int SHUTDOWN = 0 << COUNT_BITS;
  6. private static final int STOP = 1 << COUNT_BITS;
  7. private static final int TIDYING = 2 << COUNT_BITS;
  8. private static final int TERMINATED = 3 << COUNT_BITS;
  9. // Packing and unpacking ctl
  10. private static int runStateOf(int c) { return c & ~CAPACITY; }
  11. private static int workerCountOf(int c) { return c & CAPACITY; }
  12. private static int ctlOf(int rs, int wc) { return rs | wc; }

回到execute方法的源码中,从源码可以看出之前流程图表示的逻辑。详细流程这次在源码中以注释方式标明。

再来详细看一下addWorker方法,终于有机会在jdk源代码中见识一下java中的“goto”了!:

  1. //方法接收两个参数,第一个参数是首个任务,可以为空,第二个参数表示是否创建在核心线程中
  2. private boolean addWorker(Runnable firstTask, boolean core) {
  3. //以下是两层自旋,为了能从内层自旋直接跳到外层自旋,所以在这里设置retry标示
  4. retry:
  5. for (;;) {
  6. int c = ctl.get();//首先获得ctl的引用
  7. int rs = runStateOf(c);//计算出当前线程池的状态
  8. // Check if queue empty only if necessary.
  9. //以下需要判断如果线程池处于非开启状态,并且队列仍然未清空时会进行自旋
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN &&
  12. firstTask == null &&
  13. ! workQueue.isEmpty()))
  14. return false;//如果线程池关闭后队列清空,则成功返回
  15. //这里是内层自旋
  16. for (;;) {
  17. int wc = workerCountOf(c);//首先得到当前worker的数量,也就是池中线程数量
  18. if (wc >= CAPACITY ||
  19. wc >= (core ? corePoolSize : maximumPoolSize))//如果超标
  20. return false;//直接返回失败
  21. if (compareAndIncrementWorkerCount(c))//没超标则cas方式增加work数
  22. break retry;//成功后直接退出外层自旋
  23. c = ctl.get(); // Re-read ctl//如果失败的话需要reload ctl
  24. if (runStateOf(c) != rs)//如果当前状态已经改变,则重新自旋
  25. continue retry;
  26. // else CAS failed due to workerCount change; retry inner loop
  27. //如果状态没改变的话,也许有其他任务同时提交,内层自旋
  28. }
  29. }
  30. boolean workerStarted = false;
  31. boolean workerAdded = false;
  32. Worker w = null;
  33. try {
  34. final ReentrantLock mainLock = this.mainLock;//在增减Worker的时候使用锁而不使用并发容器官方的解释时,当线程池shutdown的时候为了避免线程中断的高峰,加锁后可以变为串行执行
  35. w = new Worker(firstTask);
  36. final Thread t = w.thread;
  37. if (t != null) {
  38. mainLock.lock();
  39. try {
  40. // Recheck while holding lock.
  41. // Back out on ThreadFactory failure or if
  42. // shut down before lock acquired.
  43. int c = ctl.get();
  44. int rs = runStateOf(c);//double check
  45. if (rs < SHUTDOWN || //如果线程池正在运行
  46. (rs == SHUTDOWN && firstTask == null)) { //或是shutdown后调用firstTask为null的时候
  47. if (t.isAlive()) // precheck that t is startable
  48. throw new IllegalThreadStateException();
  49. workers.add(w);
  50. int s = workers.size();
  51. if (s > largestPoolSize)//largestPoolSize用来表示线程池中曾经到达过的最大线程数
  52. largestPoolSize = s;
  53. workerAdded = true;
  54. }
  55. } finally {
  56. mainLock.unlock();
  57. }
  58. if (workerAdded) {
  59. t.start();//线程启动在这里
  60. workerStarted = true;
  61. }
  62. }
  63. } finally {
  64. if (! workerStarted)//只有在线程池处于非运行状态下才会添加失败
  65. addWorkerFailed(w);//这时除了会从works中移除work之外,还会触发tryTerminate操作,tryTerminate会在任何发现当前线程池已经关闭的时候触发,不只在执行shutdown操作的时候触发。
  66. }
  67. return workerStarted;
  68. }

addWorker方法就是这样。如果用过线程池就会发现,提交任务还有一种带返回值的方式,也就是submit,它是怎么做的呢?答案在AbstractExecutorService这个基类中。

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. public <T> Future<T> submit(Runnable task, T result) {
  8. if (task == null) throw new NullPointerException();
  9. RunnableFuture<T> ftask = newTaskFor(task, result);
  10. execute(ftask);
  11. return ftask;
  12. }
  13. public <T> Future<T> submit(Callable<T> task) {
  14. if (task == null) throw new NullPointerException();
  15. RunnableFuture<T> ftask = newTaskFor(task);
  16. execute(ftask);
  17. return ftask;
  18. }

提供了三种submit方式,其实都是构造成FutureTask,因为FutureTask实现了Runnable接口,可以直接调用上文提到的execute方法执行,最终把FutureTask对象返回给用户即可。

线程池还提供了invokeAll和invokeAny方法,用来批量提交任务,两个方法都是阻塞执行的,区别在于,invokeAll方法只有当所有任务都执行完之后才返回结果集,而invokeAny方法只要有一个任务执行完成了,就把结果返回,并取消其他未执行完成的任务。invokeAll和invokeAny都提供阻塞和带超时时间的方法。如下所示:

  1. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2. throws InterruptedException;
  3. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  4. long timeout, TimeUnit unit)
  5. throws InterruptedException;
  6. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  7. throws InterruptedException, ExecutionException;
  8. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  9. long timeout, TimeUnit unit)
  10. throws InterruptedException, ExecutionException, TimeoutException;

下面来分别看看这两种方法是如何实现的。invokeAll方法:

  1. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2. throws InterruptedException {
  3. if (tasks == null)
  4. throw new NullPointerException();
  5. List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  6. boolean done = false;
  7. try {
  8. for (Callable<T> t : tasks) {
  9. RunnableFuture<T> f = newTaskFor(t);
  10. futures.add(f);
  11. execute(f);
  12. }
  13. for (Future<T> f : futures) {
  14. if (!f.isDone()) {
  15. try {
  16. f.get();
  17. } catch (CancellationException ignore) {
  18. } catch (ExecutionException ignore) {
  19. }
  20. }
  21. }
  22. done = true;
  23. return futures;
  24. } finally {
  25. if (!done)
  26. for (Future<T> f : futures)
  27. f.cancel(true);
  28. }
  29. }

这方法几乎不用解释,只是按提交任务的顺序,循环遍历任务列表等待任务结果,这过程中不响应任何异常,除非任务被取消,当有任务被取消时所有任务都依次被取消,抛出InterruptedException异常。

带超时时间的invokeAll方法其实也很简单,只不过在每次提交任务判断是否已经超时,获得futureTask结果时使用 f.get(nanos, TimeUnit.NANOSECONDS),当出现超时异常时取消所有任务,并返回结果集。

invokeAny是否包含超时时间的两个方法实际上都调用的时下面这个doInvokeAny方法,这个方法我以注释的方式加以分析:

  1. //第二个参数用来标识是否有超时时间
  2. private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
  3. boolean timed, long nanos)
  4. throws InterruptedException, ExecutionException, TimeoutException {
  5. if (tasks == null)
  6. throw new NullPointerException();
  7. int ntasks = tasks.size();//这里会记录待提交任务数
  8. if (ntasks == 0)
  9. throw new IllegalArgumentException();
  10. List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);//创建一个用来记录任务结果的集合
  11. //这里构造了一个线程池的包装类,实现很简单,当任务执行完成后,会放入一个FIFO的队列,所以这个队里是以任务的完成先后来排序的,这一点非常重要
  12. ExecutorCompletionService<T> ecs =
  13. new ExecutorCompletionService<T>(this);
  14. try {
  15. //这里记录任务运行失败的最后一次异常信息,如果所有任务都失败了,则会抛出这个异常。
  16. ExecutionException ee = null;
  17. long lastTime = timed ? System.nanoTime() : 0;
  18. Iterator<? extends Callable<T>> it = tasks.iterator();//获得所有任务的迭代器
  19. futures.add(ecs.submit(it.next()));//执行第一个任务并加入结果集合
  20. --ntasks;//待提交任务会减少
  21. int active = 1;//正在进行的任务会增加
  22. for (;;) {//接下来进入自旋
  23. Future<T> f = ecs.poll();//这里会返回最先完成的任务
  24. if (f == null) {//如果为空
  25. if (ntasks > 0) {//而且还有任务可以提交,则进行任务的提交
  26. --ntasks;
  27. futures.add(ecs.submit(it.next()));
  28. ++active;
  29. }
  30. else if (active == 0)//如果没有任务可以提交,而且所有提交的任务都已经完成,则跳出自旋
  31. break;
  32. else if (timed) {//如果没有任务可以提交,而且需要考虑超时时间,则在一定时间内等待有任务完成
  33. f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
  34. if (f == null)//超时则抛出超时异常
  35. throw new TimeoutException();
  36. long now = System.nanoTime();
  37. nanos -= now - lastTime;
  38. lastTime = now;//获得任务后进行超时时间的计算
  39. }
  40. else//如果不需要考虑超时时间,则阻塞的等待第一个任务完成
  41. f = ecs.take();
  42. }
  43. if (f != null) {//当有任务完成时
  44. --active;//正在进行的任务数会减少
  45. try {
  46. return f.get();//如果能顺利获得结果,则直接返回,如果出现任何异常,则进入自旋,尝试等待下一个任务完成
  47. } catch (ExecutionException eex) {
  48. ee = eex;
  49. } catch (RuntimeException rex) {
  50. ee = new ExecutionException(rex);
  51. }
  52. }
  53. }
  54. //当所有任务都执行完成,且没有任何任务成功时,会抛出异常
  55. if (ee == null)
  56. ee = new ExecutionException();
  57. throw ee;
  58. } finally {//如果某一个任务成功执行完毕,则取消所有任务
  59. for (Future<T> f : futures)
  60. f.cancel(true);
  61. }
  62. }

以上就是线程池的批量执行方法的分析。

创建了Worker就需要执行,接下来看看Worker是如何执行的:

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. final Thread thread;
  6. Worker(Runnable firstTask) {
  7. setState(-1); // inhibit interrupts until runWorker
  8. this.firstTask = firstTask;
  9. this.thread = getThreadFactory().newThread(this);
  10. }
  11. public void run() {
  12. runWorker(this);
  13. }
  14. ...
  15. }

Worker本身采用独占模式继承了AbstractQueuedSynchronizer,实现了Runnable接口。每个Worker对象又会包装一个thread,所以每个worker就变成了可重用的,而且又是独占式的。runWorker方法如下:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

runWorker方法会持续不断的从Queue中获得任务并执行,runWorker有以下几点需要注意:

  1. 可以看到: Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { ... } worker会首先尝试获得firstTask,如果firstTask不存在也没关系,worker会持续不断的通过getTask方法从Queue中获得下一个任务。如果getTask方法没有获得任何任务,worker会自动推出并执行processWorkerExit方法。
  2. 每次执行任务前会加锁,并且通过Thread.interrupted()清楚中断状态,保证只要线程池不关闭,任务肯定会继续执行。
  3. 任务执行前后会执行扩展方法beforeExecute(wt, task)和afterExecute(task, thrown),值得注意的是beforeExecute执行过程中抛出的异常不会被catch住,所以beforeExecute如果有异常抛出,任务不会执行。是所有RuntimeException会交给afterExecute处理最后抛出。值得注意的是由于Error不会强制被catch的特殊性,任何其他Throwable都会封装成Error,和Error一样在afterExecute处理过后抛出。

还记得线程池里设置了每个线程的存活时间吗,来看看getTask方法,具体逻辑解释以注释方式给出:

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. retry:
  4. for (;;) {
  5. int c = ctl.get();
  6. int rs = runStateOf(c);
  7. //每次尝试获取任务之前
  8. //都首先会判断一下是否需要获取任务,如果当前线程池已经开始关闭,或者队列为空就不用获取了
  9. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  10. decrementWorkerCount();//这时设置worker数减1,这个线程就要关闭了
  11. return null;
  12. }
  13. boolean timed; // Are workers subject to culling?
  14. for (;;) {
  15. int wc = workerCountOf(c);
  16. timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池允许线程 timeout或者当前线程数大于核心线程数,则会进行timeout的处理
  17. //如果线程数小于最大线程数,并且不需要做timeout判断则直接跳出
  18. if (wc <= maximumPoolSize && ! (timedOut && timed))
  19. break;
  20. if (compareAndDecrementWorkerCount(c))//否则,也就是已经timeout了,通过cas削减worker数,并返回null意味着线程即将关闭
  21. return null;
  22. //如果cas削减worker数失败
  23. c = ctl.get(); // Re-read ctl
  24. if (runStateOf(c) != rs)//double check一下当前状态是不是被其他线程改动过
  25. continue retry;//如果改动过,则重新执行以上逻辑
  26. //如果状态没改动过,cas设置失败应该是由于worker数被其他线程修改过,在内部自旋重新判断
  27. }
  28. //如果确实需要获取任务
  29. try {
  30. Runnable r = timed ?
  31. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  32. workQueue.take();
  33. if (r != null)//成功获取后返回
  34. return r;
  35. timedOut = true;
  36. } catch (InterruptedException retry) {
  37. timedOut = false;
  38. }
  39. }
  40. }

有开始就有结束,接下来看看worker怎么终结自己的,当runWorker跳出while循环后会执行finally中的processWorkerExit:

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  3. decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks;
  8. workers.remove(w);
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. int c = ctl.get();
  14. if (runStateLessThan(c, STOP)) {
  15. if (!completedAbruptly) {
  16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  17. if (min == 0 && ! workQueue.isEmpty())
  18. min = 1;
  19. if (workerCountOf(c) >= min)
  20. return; // replacement not needed
  21. }
  22. addWorker(null, false);
  23. }
  24. }

在worker关闭之前首先要判断completedAbruptly,这个参数为true,则表明worker是由于执行任务时抛出异常才无奈关闭,如果为false,则是由于没有新的任务可以获取,空闲时间太长而关闭。因为空闲而关闭在getTask方法中已经做了worker数削减的操作了,所以在这里不再削减。

这之后会在锁内完成从workers中删除worker的操作,接下来在有可能终止线程池的地方尝试终止。最后要根据当前线程池worker的数量以及核心线程数,Queue中剩余任务数来决定是否创建新的worker。

worker从创建,工作,到销毁的整个生命周期就分析完了。

再来看看线程池是如何关闭的:

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. advanceRunState(SHUTDOWN);
  7. interruptIdleWorkers();
  8. onShutdown(); // hook for ScheduledThreadPoolExecutor
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. }

shutdown方法是优雅关闭的方法,所有已经提交的任务会正常执行,关闭之后提交的任务不会被接受。可以看到所有关闭操作都是在mainLock中进行的。

首先会通过SecurityManager来判断是否允许checkShutdownAccess。如果允许或是SecurityManager没有设置。advanceRunState方法如下:

  1. private void advanceRunState(int targetState) {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (runStateAtLeast(c, targetState) ||
  5. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
  6. break;
  7. }
  8. }

这里会自旋判断当前状态,并通过CAS设置状态为关闭,如果有其他线程同时改变了状态导致设置状态失败,则会进入自旋,直到状态设置成功为止。

  1. private void interruptIdleWorkers() {
  2. interruptIdleWorkers(false);
  3. }
  4. private void interruptIdleWorkers(boolean onlyOne) {
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. for (Worker w : workers) {
  9. Thread t = w.thread;
  10. if (!t.isInterrupted() && w.tryLock()) {
  11. try {
  12. t.interrupt();
  13. } catch (SecurityException ignore) {
  14. } finally {
  15. w.unlock();
  16. }
  17. }
  18. if (onlyOne)
  19. break;
  20. }
  21. } finally {
  22. mainLock.unlock();
  23. }
  24. }

之后调用interruptIdleWorkers方法中断所有空闲的线程。因为每一个worker都包含一个独占锁,worker执行任务时会持有这个独占锁,所以这里只会中断没有在工作的worker。

shutdown方法接下来会执行一个扩展点onShutdown,最后执行tryTerminate尝试关闭线程池。tryTerminate方法如下:

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (isRunning(c) ||
  5. runStateAtLeast(c, TIDYING) ||
  6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  7. return;
  8. if (workerCountOf(c) != 0) { // Eligible to terminate
  9. interruptIdleWorkers(ONLY_ONE);
  10. return;
  11. }
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  16. try {
  17. terminated();
  18. } finally {
  19. ctl.set(ctlOf(TERMINATED, 0));
  20. termination.signalAll();
  21. }
  22. return;
  23. }
  24. } finally {
  25. mainLock.unlock();
  26. }
  27. // else retry on failed CAS
  28. }
  29. }

这个方法进入自旋之后首先会判断当前状态是否可以终止,例如如果是running状态则无需执行终止。

之后的这段逻辑略显晦涩:

  1. if (workerCountOf(c) != 0) { // Eligible to terminate
  2. interruptIdleWorkers(ONLY_ONE);
  3. return;
  4. }

总的来说就是如果当前worker仍然存活,就尝试中断任何一个空闲的worker,这里只中断一个worker,为的是让这个worker在终止之前执行processWorkerExit,这样会再次调用tryTerminate方法,这样就把关闭的信息传播下去,达到优雅关闭的目的。

在tryTerminate的最后会在锁内用CAS把状态置为TIDYING,执行扩展方法terminated(),之后状态被设为TERMINATED,并signal所有等待线程池关闭的线程。在第一次执行CAS操作失败就会进入自旋,知道成功为止。

除了优雅关闭之外,还有强制关闭的方法:

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. advanceRunState(STOP);
  8. interruptWorkers();
  9. tasks = drainQueue();
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. tryTerminate();
  14. return tasks;
  15. }

这个方法和优雅关闭shutdown方法前半段都差不多,只不过最后会中断所有worker,并把仍然在Queue中等待执行的任务打包返回给用户。

了解完了最基础的线程池之后,来看看各种各样具备特殊职能的线程池。

ScheduledThreadPoolExecutor

接下来关注一下ScheduledThreadPoolExecutor,这个线程池继承了ThreadPoolExecutor可以执行定时任务,也就是类似timer的作用不过由于是线程池,可以一次执行多个任务,节约线程资源。

一般不会直接使用ScheduledThreadPoolExecutor的构造函数,因为Executors工具类提供了更便捷的使用方式:

  1. Executors.newSingleThreadScheduledExecutor();
  2. Executors.newScheduledThreadPool(1);

这个方法创建了一个

  1. class Executors...
  2. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  3. return new DelegatedScheduledExecutorService
  4. (new ScheduledThreadPoolExecutor(1));
  5. }
  6. static class DelegatedScheduledExecutorService
  7. extends DelegatedExecutorService
  8. implements ScheduledExecutorService {
  9. private final ScheduledExecutorService e;
  10. DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
  11. super(executor);
  12. e = executor;
  13. }
  14. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  15. return e.schedule(command, delay, unit);
  16. }
  17. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  18. return e.schedule(callable, delay, unit);
  19. }
  20. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
  21. return e.scheduleAtFixedRate(command, initialDelay, period, unit);
  22. }
  23. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  24. return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
  25. }
  26. }

由于声明的是ScheduledExecutorService接口,这样就不会暴露出线程池自身的一些高级方法。而且,作为处理定时任务的线程池,任务数一般不会太多,所以没有必要使用一个以上的线程。

ScheduledThreadPoolExecutor提供了4个构造函数,分别可以设定corePoolSize,threadFactory,rejectedExecutionHandler。唯独不能设定阻塞队列,因为ScheduledThreadPoolExecutor必须使用阻塞队列DelayedWorkQueue,不过有一点不同导致他没有继承DelayQueue,而是重写一个,因为定时任务的业务逻辑导致不得不在堆中记录每个任务的index。这样做有以下几点好处:

  1. 当取消任务时把查找的时间复杂度从O(n)降为O(log n)
  2. 当任务在堆内移动时提升了gc效果

尝试向线程池提交一个任务,我们可以选择延迟一段时间再执行:

  1. pool.schedule(new Callable()[T]{
  2. ...
  3. }, 10L, TimeUnit.SECONDS)

这个方法实现如下:

  1. class ScheduledThreadPoolExecutor...
  2. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  3. if (callable == null || unit == null)//判断非空
  4. throw new NullPointerException();
  5. RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//使用callable和触发任务执行时间构造RunnableScheduledFuture对象
  6. delayedExecute(t);//延迟执行
  7. return t;
  8. }

重点看看delayedExecute方法:

  1. class ScheduledThreadPoolExecutor...
  2. private void delayedExecute(RunnableScheduledFuture<?> task) {
  3. if (isShutdown()) //如果关闭,则使用handler拒绝任务
  4. reject(task);
  5. else { //如果一切正常
  6. super.getQueue().add(task); //把task添加到延迟队列
  7. if (isShutdown() &&
  8. !canRunInCurrentRunState(task.isPeriodic()) && remove(task))//再次判断如果已经shutdown,如果不需要在关闭后仍然执行周期任务,则把任务移除
  9. task.cancel(false);//任务取消
  10. else //如果一切正常,保证线程池存在至少一个线程
  11. ensurePrestart();
  12. }
  13. }

与[转帖]我所知道的线程池相似的内容:

[转帖]我所知道的线程池

https://bigbully.github.io/%E7%BA%BF%E7%A8%8B%E6%B1%A0 线程池其实或多或少都用过,不过这是我第一次阅读它的源码,包括源码附带的非常详尽的注释。发现我之前对于线程池的理解还是很浅薄的。 其实从ThreadPoolExecutor.java顶部200

[转帖]我所理解的SRE、PE和应用运维

https://www.cnblogs.com/zhangxinglong/p/14756366.html SRE这个概念我个人印象中应该14年下半年左右听到的,当时只知道是Google对运维岗位定义,巨牛逼的一个岗位,在网上查到SRE是叫网站稳定工程师,只要是保障稳定为主,其他就没有更深的意识了。

[转帖]高性能网络 | 你所不知道的 TIME_WAIT 和 CLOSE_WAIT

高性能网络 | 你所不知道的 TIME_WAIT 和 CLOSE_WAIThttps://my.oschina.net/fdhay/blog/638631 本文是我将最近两篇文章,重新整理成一篇,方便收藏。如果你已经阅读过前两篇,并且已经做了收藏,可以重新收藏本文即可。 你有收藏和整理文章的习惯吗?

[转帖]高性能网络 | 你所不知道的TIME_WAIT和CLOSE_WAIT

https://zhuanlan.zhihu.com/p/528747315 你遇到过TIME_WAIT的问题吗? 我相信很多都遇到过这个问题。一旦有用户在喊:网络变慢了。第一件事情就是,netstat -a | grep TIME_WAIT | wc -l 一下。哎呀妈呀,几千个TIME_WAIT

【转帖】通过docker配置DNS服务

https://blog.whsir.com/post-3185.html 在办公室开发人员经常会测试所写的页面,每次都要输入对应的IP地址或者更改hosts,为了让开发大爷省心,不如搭建一个dns服务,将所需要测试的网页直接解析成域名,让开发大爷自己选域名,想用啥就用啥,我这里通过docker配置

[转帖]mysql 千万数据迁移的几种方式

最近因为业务需求,我们需要将我们的订单表(一千三百万数据,并且每天已五万条速度增加)已订单类型分组迁移到新的业务表中,以降低我们订单表的大小,同时暂时杜绝订单表所带来的数据瓶颈问题,需求下来了,基本思路也要确定下来,我打算先将三天前的历史数据先跑到表里,待整个业务线迁移过后,我再将剩下的数据跑进去,

[转帖]linux磁盘IO读写性能优化

在LINUX系统中,如果有大量读请求,默认的请求队列或许应付不过来,我们可以 动态调整请求队列数来提高效率,默认的请求队列数存放在/sys/block/xvda/queue/nr_requests 文件中,注意:/sys/block/xvda ,这里 xvda 写的是你自己的硬盘名,因我的是vps所

[转帖]ping、arp、tracert、route这四大命令的详细用法,用途很广

https://zhuanlan.zhihu.com/p/460719455 在网络中ping是一个十分强大的TCP/IP工具。它的作用主要为: 1、用来检测网络的连通情况和分析网络速度 2、根据域名得到服务器IP 3、根据ping返回的TTL值来判断对方所使用的操作系统及数据包经过路由器数量。 我

[转帖]我偷偷学了这 5 个命令,打印 Linux 环境变量那叫一个“丝滑”!

https://xie.infoq.cn/article/2acfef8d543517619c9202e4e 一、概述 在基于 Linux 和 Unix 的系统中,环境变量是一组动态命名值,存储在系统中,供在 shell 或子 shell 中启动的应用程序使用。简单来说,环境变量是具有名称和关联值的

[转帖]我63岁,夫妻俩退休工资15000,回农村养老不到一年,落荒而逃

[一种声音]我63岁,夫妻俩退休工资15000,回农村养老不到一年,落荒而逃 xilei 发布于 2023-2-14 9:36:00 我63岁,夫妻俩退休工资15000,回农村养老不到一年,我们落荒而逃 1、 我是老袁,江西人,一个儿子在上海工作,儿子和儿媳都有不错的单位。孙子住寄宿学校,也不需要我