Java多线程-ThreadPool线程池-2(四)

java,多线程,threadpool,线程 · 浏览次数 : 178

小编点评

**ArrayBlockingQueue测试** ```java public class ArrayBlockingQueueTester { public static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5); // 一个往里放 class Producer implements Runnable { @Override public void run() { try { queue.put(\"川菜\"); System.out.println(Thread.currentThread().getName() + \" 厨师做好川菜\"); } catch (InterruptedException e) { e.printStackTrace(); } } } // 一个往外拿 class Consumer implements Runnable { @Override public void run() { try { String food = queue.take(); System.out.println(Thread.currentThread().getName() + \" 客人消费 \" + food); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { // 要处理的线程数过大时,是否抛出异常,取决于机器的性能 ExecutorService service = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 10000; i++) { service.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()); }); } service.shutdown(); } } ``` **PriorityBlockingQueue测试** ```java public class PriorityBlockingQueueTest { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { service.execute(new Test1(i)); } service.shutdown(); } } ``` **自定义ThreadFactory** ```java public class SelfDefineThreadPoolExecutor { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(8), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() { @Override public Thread newThread(Runnable r) { System.out.println("线程 " + r.hashCode() + " 创建"); return new Thread(r, "thread-pool-" + r.hashCode()); } }); for (int i = 0; i < 10; i++) { service.execute(new Test2(\"Test2\" + i)); } service.shutdown(); } } ```

正文

线程池是个神器,用得好会非常地方便。本来觉得线程池的构造器有些复杂,即使讲清楚了对今后的用处可能也不太大,因为有一些Java定义好的线程池可以直接使用。但是(凡事总有个但是),还是觉得讲一讲可能跟有助于理解后面的常用线程池,所以该打脸还是打吧

因为直接结合代码看会更清楚一些,所以我把带注释的代码贴出来:

public class ThreadPoolExecutor {
    public ThreadPoolExecutor(
    /**
             * corePoolSize:初始化时指定的核心线程数,包括空闲线程,必须大于等于0,当有新任务提交时,会执行以下判断(workCount为当前活跃的线程数量):
             * 当workCount< corePoolSize:即使线程池中有空闲线程,也会创建新线程
             * 当corePoolSize ≤ workCount < maximumPoolSize:只有workQueue满时才创建新线程
             * 当corePoolSize < workCount < maximumPoolSize:且超过corePoolSize部分的线程空闲时间达到keepAliveTime时,就回收这些线程,当设置allowCoreThreadTimeOut(true)时,
             *                                               线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将被回收
             * 当设置corePoolSize == maximumPoolSize:线程池的大小固定,此时如有新任务提交,且workQueue未满时,会将请求放入workQueue,等待有空闲的线程从workQueue中取任务并处理
             * 当workCount ≥ maximumPoolSize:若workQueue满,则采取handler对应的策略
             */
    int corePoolSize,
                // maximumPoolSize:初始化时指定的最大线程数量
    int maximumPoolSize,
                // keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是等待,直到等待的时间超过了keepAliveTime
    long keepAliveTime,
                // 空闲时间单位
    TimeUnit unit,
    /**
             * workQueue:阻塞队列的类型是保存等待执行的任务的阻塞队列,主要有四种提交方式:
             * SynchronousQueue:同步队列,这个“队列”内部只包含了一个元素,队列的size始终为0,每执行一个put,就需要一个take来解除阻塞,反之也一样。饱和状态下,线程池能处理的最大线程数量为maximumPoolSize
             *      使用SynchronousQueue队列,提交的任务不会保存,而是会马上提交执行
             *      需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略
             * ArrayBlockingQueue:有界任务队列,饱和状态下,线程池能处理的最大线程数量为maximumPoolSize + ArrayBlockingQueue.SIZE
             * LinkedBlockingQueue:无界任务队列,线程池的任务队列可以无限制的添加新的任务,此时线程池能够创建的最大线程数是corePoolSize,
             *                      而maximumPoolSize就无效了,线程池饱和状态下能处理的最大线程数量只取决于系统的性能
             * PriorityBlockingQueue:优先任务队列,同LinkedBlockingQueue一样,它也是一个无界的任务队列,只不过需要自己实现元素的Comparable排序接口
             */
    BlockingQueue<Runnable> workQueue,
                // threadFactory:创建新线程,使新创建的线程有相同的优先级且为非守护线程,同时设置线程的名称,默认使用Executors.DefaultThreadFactory类创建
    ThreadFactory threadFactory,
    /**
             * handler:表示线程池的饱和策略,意思就是如果阻塞队列满了并且没有空闲的线程,此时如果继续提交任务,就需要采取一种策略处理该任务,线程池提供了4种策略
             * AbortPolicy:直接抛出异常,这是默认策略
             * CallerRunsPolicy:如果线程池的线程数量达到上限,则把任务队列中的任务放在调用者的线程当运行
             * DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
             * DiscardPolicy:直接丢弃任务
             */
    RejectedExecutionHandler handler) {
        // balabala… …
    }
}

这样就清晰多了。

其中,最主要是要清楚几种workQueue,也就是BlockingQueue<Runnable>的作用。

 

SynchronousQueue同步队列,这个队列没有所谓的缓冲,这样做是为了排除阻塞时队列丢消息的可能。如果没有其他微服务并行执行的话,可以放心地用这个队列,不然还是小心一点为妙。它的示例代码:

/**
 * 同步队列
 */
public class SynchronousQueueTest {

  public static void main(String[] args) {
    ExecutorService service = new ThreadPoolExecutor(
      1,
      // 当要处理的线程数超过maximumPoolSize时,抛出异常
      2,
      1000,
      TimeUnit.MILLISECONDS,
      new SynchronousQueue<Runnable>(),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.AbortPolicy()
    );
    for (int i = 0; i < 10; i++) {
      service.execute(() ->
        System.out.println("当前线程 " + Thread.currentThread().getName())
      );
    }
    service.shutdown();
  }
}

ArrayBlockingQueue,它的使用范围非常广,一般可以用于轻量级的同步锁,也就是在同一个服务中(也就是非微服务架构),如果要具有分布式锁的功能又不想部署zookeeper这么麻烦的话,ArrayBlockingQueue就是一个非常不错的选择。

/**
 * 有界阻塞队列
 */
public class ArrayBlockingQueueTest {

  public static void main(String[] args) {
    ExecutorService service = new ThreadPoolExecutor(
      // 要处理的线程数超过maximumPoolSize  + workQueue.SIZE时,抛出异常
      1,
      2,
      1000,
      TimeUnit.MILLISECONDS,
      new ArrayBlockingQueue<Runnable>(10),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.AbortPolicy()
    );
    // 因为 maximumPoolSize(2) + ArrayBlockingQueue.SIZE(10) < 13,所以会抛出异常
    for (int i = 0; i < 13; i++) {
      service.execute(() ->
        System.out.println("当前线程 " + Thread.currentThread().getName())
      );
    }
    service.shutdown();
  }
}

再来看看ArrayBlockingQueue的另一个例子,可以加深印象:

public class ArrayBlockingQueueTester {

  public static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);

  // 一个往里放
  class Producer implements Runnable {

    @Override
    public void run() {
      try {
        queue.put("川菜");
        System.out.println(Thread.currentThread().getName() + " 厨师做好 川菜");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  // 一个往外拿
  class Consumer implements Runnable {

    @Override
    public void run() {
      try {
        String food = queue.take();
        System.out.println(
          Thread.currentThread().getName() + " 客人消费 " + food
        );
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public static void main(String[] args) {
    // 客人等着菜
    for (int i = 0; i < 5; i++) {
      new Thread(new ArrayBlockingQueueTester().new Consumer()).start();
    }

    // 厨师做好菜
    for (int i = 0; i < 5; i++) {
      new Thread(new ArrayBlockingQueueTester().new Producer()).start();
    }
  }
}

ArrayBlockingQueue说白了就是一个往里放,一个往外拿:

1、往里放的,只能最多放指定个数就不能再放了(阻塞,等待,这里是5个);

2、往外拿的,如果没有可以拿的了,就等着(阻塞,等待)。

咱们点菜的时候不就是这样吗?

LinkedBlockingQueue这个就牛逼了,相当于无底洞,有多少处理多少,此时线程池能够创建的最大线程数是corePoolSize,而maximumPoolSize就成了摆设,这等于说是完全取决于系统的性能。

/**
 * 无界阻塞队列
 */
public class LinkedBlockingQueueTest {

  public static void main(String[] args) {
    // 要处理的线程数过大时,是否抛出异常,取决于机器的性能
    ExecutorService service = new ThreadPoolExecutor(
      1,
      2,
      1000,
      TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<>(),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.AbortPolicy()
    );
    for (int i = 0; i < 10000; i++) {
      service.execute(() ->
        System.out.println("当前线程 " + Thread.currentThread().getName())
      );
    }
    service.shutdown();
  }
}

最后一个队列是PriorityBlockingQueue,它是一种有优先级的无界阻塞队列,默认的元素执行顺序是升序,可以通过自定义接口Comparable<T>实现compareTo()方法来指定队列中的元素执行顺序。

/**
 * 测试类
 */
public class Test1 implements Runnable, Comparable<Test1> {

  private int priority;

  public Test1(int priority) {
    this.priority = priority;
  }

  public int getPriority() {
    return priority;
  }

  public void setPriority(int priority) {
    this.priority = priority;
  }

  @Override
  public int compareTo(Test1 o) {
    // 返回1时为升序
    // 返回-1为降序
    return this.priority > o.priority ? -1 : 1;
  }

  @Override
  public void run() {
    System.out.println(
      "当前线程 " +
      Thread.currentThread().getName() +
      ", priority = " +
      this.priority
    );
  }
}

/**
 * 有优先级的无界阻塞队列
 */
public class PriorityBlockingQueueTest {

  public static void main(String[] args) {
    ExecutorService service = new ThreadPoolExecutor(
      1,
      2,
      1000,
      TimeUnit.MILLISECONDS,
      new PriorityBlockingQueue<>(),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.AbortPolicy()
    );
    for (int i = 0; i < 10; i++) {
      service.execute(new Test1(i));
    }
    service.shutdown();
  }
}

如果想在线程池的执行线程中加入一点自己希望的动作,可以通过自定义ThreadFactory实现。

/**
 * 测试类
 */
public class Test2 implements Runnable {
    private String name;

    public Test2(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(this.getName() + ",当前线程 " + Thread.currentThread().getName());
    }
}



/**
 * 自定义ThreadFactory
 */
public class SelfDefineThreadPoolExecutor {
    public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(
                1,
                2,
                1000,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(8),
                // 自定义ThreadFactory
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        System.out.println("线程 " + r.hashCode() + " 创建");
                        return new Thread(r, "thread-pool-" + r.hashCode());
                    }
                },
                // 加入自定义动作
                new ThreadPoolExecutor.CallerRunsPolicy()
        ) {
            public void beforeExecute(Thread thread, Runnable runnable) {
                System.out.println(((Test2) runnable).getName() + " 准备执行");
            }
            public void afterExecute(Thread thread, Runnable runnable) {
                System.out.println(((Test2) runnable).getName() + " 执行完毕");
            }
            public void terminated() {
                System.out.println("线程池关闭");
            }
        };
        for (int i = 0; i < 10; i++) {
            service.execute(new Test2("Test2" + i));
        }
        service.shutdown();
    }
}

其实主要是把常用那几个workQueue搞搞清楚,因为这几个在今后的工作中可能会用到,尤其是ArrayBlockingQueue,它和后面会说的另两个神器,可以说是是「线程三宝」。

 

与Java多线程-ThreadPool线程池-2(四)相似的内容:

Java多线程-ThreadPool线程池-2(四)

线程池是个神器,用得好会非常地方便。本来觉得线程池的构造器有些复杂,即使讲清楚了对今后的用处可能也不太大,因为有一些Java定义好的线程池可以直接使用。但是(凡事总有个但是),还是觉得讲一讲可能跟有助于理解后面的常用线程池,所以该打脸还是打吧 因为直接结合代码看会更清楚一些,所以我把带注释的代码贴出

Java多线程-ThreadPool线程池-1(三)

开完一趟车完整的过程是启动、行驶和停车,但老司机都知道,真正费油的不是行驶,而是长时间的怠速、频繁地踩刹车等动作。因为在速度切换的过程中,发送机要多做一些工作,当然就要多费一些油。 而一个Java线程完整的生命周期就包括: 1、T1:创建(启动) 2、T2:运行(行驶) 3、T3:销毁(停车) 而T

Java多线程-ThreadPool线程池-3(五)

除了可以通过ThreadPoolExecutor自定义线程池外,同Stream API中的Collectors一样,多线程里的Executors类也提供了一组相关的线程池工具,可以直接拿来用,不用考虑用什么队列合适的问题。 Javac除了传统的四大线程池工具: 1、newFixedThreadPoo

Java多线程-JUC-1(八)

前面把线程相关的生命周期、关键字、线程池(ThreadPool)、ThreadLocal、CAS、锁和AQS都讲完了,现在就剩下怎么来用多线程了。而要想用好多线程,其实是可以取一些巧的,比如JUC(好多面试官喜欢问的JUC,就是现在要讲的JUC)。JUC就是java.util.concurrent的

Java多线程-ThreadLocal(六)

为了提高CPU的利用率,工程师们创造了多线程。但是线程们说:要有光!(为了减少线程创建(T1启动)和销毁(T3切换)的时间),于是工程师们又接着创造了线程池ThreadPool。就这样就可以了吗?——不,工程师们并不满足于此,他们不把自己创造出来的线程给扒个底朝天决不罢手。 有了线程关键字解决线程安

Java多线程

一.线程的生命周期及五种基本状态 关于Java中线程的生命周期,首先看一下下面这张较为经典的图: 上图中基本上囊括了Java中多线程各重要知识点。掌握了上图中的各知识点,Java中的多线程也就基本上掌握了。主要包括: Java线程具有五中基本状态 新建状态(New):当线程对象对创建后,即进入了新建

Java多线程生成波场靓号

​ 玩区块链,手上没靓号怎么行。用网上的靓号生成器有一定的风险性,思来想去决定自己写一个。首先需要导入波场官方编辑 org.tron.trident utils

java多线程编程:你真的了解线程中断吗?

java.lang.Thread类有一个 interrupt 方法,该方法直接对线程调用。当被interrupt的线程正在sleep或wait时,会抛出 InterruptedException 异常。事实上, interrupt 方法只是改变目标线程的中断状态(interrupt status),...

杰哥教你面试之一百问系列:java多线程

java多线程是java面试中的高频问题,如何才能在面试中脱颖而出呢?熟读这里的一百个java多线程面试问题即可。 ### 1. **什么是线程?什么是进程?** **回答:** - 线程是操作系统能够进行调度的最小执行单位,它包含在进程中,共享进程的资源。 - 进程是一个正在执行中的程序,它包含了

JAVA多线程并发编程-避坑指南

本篇旨在基于编码规范、工作中积累的研发经验等,整理在多线程开发的过程中需要注意的部分,比如不考虑线程池参数、线程安全、死锁等问题,将会存在潜在极大的风险。并且对其进行根因分析,避免每天踩一坑,坑坑不一样。