Loading...
墨滴

黎杜

2021/08/25  阅读:37  主题:默认主题

八股文线程池长文预警

上一篇文章给大家带来线程池的上半部分,这一篇给大家带来线程池的下半部分,包括源码分析、设计模式分析、面试总结

为了完整性,我把上半部分也放在了这篇文章里面,看过的可以直接看从源码部分往后看

这里还是感谢一下大家,前面写了Mysql的技术总结,一下子把我的这篇技术文带小火了一下,达到了5000多的阅读,关注人数直接起飞:

我不会到是不是被腾讯给推荐了,还是有大号推荐了,这几天一直往上涨,有没有人告诉这是咋回事,哈哈哈,文章还是推荐给大家,看过的就直接忽略吧:

这一期给我大家带来线程池,详细的聊一聊线程池,线程池在大厂的面试中也是高频率被问到。

而且,如果说你的简历上有写着线程池的使用的案例场景以及调优的经验,基本上你的这份简历会受到大厂面试官的青睐。

多线程一直是一块难啃的骨头,但是又是非常重要的一块内容。说它难啃主要是因为多线程开发中,需要考虑的东西很多。

比如:何如在多线程并发写的情况下,保证你的数据一致性,不会别其他线程覆盖,很多人也会考虑到使用锁(分布式锁redisLock、单机版锁、乐观锁、cas)来保证数据的一致性。

但是,同时你也要考虑QPS,不要因为加了锁,后面导致系统的整体吞吐量大大下降,后面压测过不了,就有的你忙了。

所以说,掌握了多线程编程对于写代码提升是非常大的,考的的东西也会比别人更多,而且可能还会出现一些奇奇怪怪的bug,排查起来非常困难,对于人的技术的考研是非常大的。

好了废话不多说,直接上干货。

线程池基本介绍

首先,来聊一聊啥是线程池,引用百度百科的解释:

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

用简单通俗的话来说,线程池就是类似于一种池子,里面养了很多的线程,再有任务的时候,可以直接拿取池子里面的线程来执行任务,用完之后,线程还给线程池,在下一次又有任务的时候同样这样执行。

之所以用“”这个关键字,原因是他比正常的线程来说,线程池可以复用,在此使用的时候不用再次创建。

直接从线程池的功能角度来解释就是如此简单,我们还接触过其他的类似于这种的池化技术。比如:数据库连接池、HttpClient连接池、内存池、实例池等等。

我们可以发现,在这些所谓的池化思想,都存在很多共性:预先分配、循环使用、复用

比如,连接池预先申请数据库连接,连接的复用,内存池预先分配内存,提高内存的分配效率,减少内存碎片等。

最后,还有很重要的关于作者方面就是要知道线程池的作者,他就是鼎鼎大名的:Doug Lea,并发编程大神,学Java的人都知道他(编程不识Doug Lea,写尽java也枉然)。

为什么使用线程池

那么什么使用线程池呢?其实理由答案都很简单,线程池相比传统的直接创建线程肯定是有优点。

我们学JVM的时候有聊到线程私有和线程共有的区域,至少单独的创建线程,就会单独给线程创建(虚拟机栈、程序计数器、本地方法栈),这些都是要占用内存的。

所以线程池的第一个优点就是:能够控制服务器资源,应该说合理的分配服务器资源,不至于过高的QPS,导致服务器资源分配完,从而导致整个服务器瘫痪

另外的优点就是:线程复用,因为反复的创建和销毁线程对于性能的消耗也是有影响的,线程池反而能够降低资源的消耗

最后一个就是优点也是我们最终的目的:就是为了优化系统,在大多数的情况下,线程池相比串行化的操作,异步的执行我们的任务,由原来的串行操作,修改成异步操作,降低了系统的响应时间

所以,对于线程池最常用的一个操作和场景就是:把原来很多串行的查询操作,可以修改成异步,然后在服务层做聚合,返回给前端,提系统的响应时间(前提上下文之间没有数据的依赖)。

那么说了那么多的线程池的优点,线程池有啥缺点呢?缺点还是挺多的,比如:线程池的数量配置的不合理,会导致系统资源的耗尽、可能直接导致系统出现OOM异常

另外的话使用线程池,也会带来多线程的问题,比如:数据的一致性、业务的复杂性、测试的复杂性(一般线程池的使用,都要结合测试,不断的进行压测,然后观察内存和CPU的变化影响怎么样)

所以,我们在使用线程池的时候,必须要求用其利,避其害,精通线程的的底层原理,对于我们更加合理有效的使用线程池是非常有帮助的。

那么怎么才算是精通线程池呢?我个人认为主要掌握这几方面:

  1. 所使用的技术点(AQS、CAS、Reentrantlock
  2. 常见的线程池的种类
  3. 设计模式(生产者-消费者模式、策略模式
  4. 设计的思想(弹性、可伸缩
  5. 执行的原理
  6. 源码的阅读
  7. 线程池的调优(线程数的设置

是不是感觉有点迷糊,要是在面试中,这几方面都被面试官问道,估计那可不好回答,比较常问的就是:线程池的执行原理、线程池的调优,这也是技术博主写的最多的地方。

所以,一个线程池就能聊上一上午,不得不说Doug Lea YYDS。线程池大的方向分为自定义线程池以及默认的几类线程池,下面我们先来聊一聊自定义的线程池。

自定义线程池

Java中的自定义线程池在java.util.concurrentThreadPoolExecutor 来实现,类图如下所示:

Executor:Executor是顶层的定义接口,它只定义了一个void execute(Runnable command) 方法,并且需要传入Runnable 类型的参数,也就是执行的任务。

ExecutorService:ExecutorService是Executor的扩展子接口,提供线程池的一些生命周期的一些方法,比如:isShutdown、isTerminated、shutdownNow、shutdown,为了方便理解,整理了一下简洁的源码,具体的源码如下:

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

这里要讲一下的两点,第一点就是shutdown和shutdownNow的区别就是shutdownNow是业务有损的,不管线程的任务是否执行完毕,立刻停止线程,简单粗暴。

而shutdown则是优雅的停止线程,当发出这个命令的时候,若是有些线程还在执行任务,会等待线程执行任务完毕才停止。

第二点就是:上面分析了Executor接口里面有一个执行任务的void execute(Runnable command)方法,而在ExecutorService中也扩展了一个submit的执行任务的方法

区别就是submit可以支持CallableRunnable 两种,而execute仅仅支持Runnable,学过线程的都知道,在创建基本线程的方式中有CallableRunnable,区别就是Callable可以结合Future获取线程的返回值。

ExecutorService的子类AbstractExecutorService实现,代码非常简单,如下所示:

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

从上面的代码中可以看到,其实submit的底层实现也是依赖于execute方法的

ThreadPoolExecutor:这个类是实现自定义线程池的关键类,最重要的是它的构造方法,如下图所示:

下面我们来详细的分析一下,最复杂的那个构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
         // 非法参数校验
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // workQueue&&threadFactory &&handler 都不能为空
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

corePoolSize :它表示核心线程数,为啥叫做核心线程数呢?因为还有非核心线程数,核心线程数+非核心线程数=最大线程数maximumPoolSize

核心线程数和非核心线程数,你可以理解为一个公司的正式员工和外包员工,正式员工是常驻员工(线程存活、复用),而外包正式员工是临时外聘的。

平时一般任务比较少了,正式员工可以完成任务,就不外聘外包员工,但是总有正式员工忙不过来的时候,此时就临时外聘外包员工完成超出的任务,当任务又比较少的时候,此时,外包员工空闲一段时间就会被hr辞掉,为了就是节约成本,在线程数上能够有弹性的伸缩。

对应互联网的场景就是,经过经验测试发现,在大多数的情况下,可能会存在短时间的峰值,QPS突增,但是持续的时间非常的短暂,所以为了适应这种情况,设计出了核心线程和非核心线程,非核心线程超过空闲时间的阈值,就会被停止掉。

并且,线程池的设计中,核心线程数,也不是已启动线程池就开始创建和corePoolSize一样大小的线程数,而是随着任务的增加,慢慢的创建(懒加载思想),除非是调用了prestartCoreThread/prestartAllCoreThreads,才会事先启动核心线程。

还有我们说到核心线程一般不会被销毁,除非调用方法allowsCoreThreadTimeOut,并且传参为true,则允许闲置的核心线程被终止。

但是,上面的两个行为一般不建议,只是为了扩展对外暴露的方法。

maximumPoolSize:这个参数就是表示核心线程了非核心线程的总数,也就是线程池的最大线程数,当线程数超过这个最大线程数时,就会走拒绝策略。

keepAliveTime:表示空闲线程最多空闲的时间,超过这个阈值就会被回收掉。

unit:空闲时间的单位,例如 TimeUnit.SECONDS

workQueue:是存放任务的队列。上面提及到队列这种东西,workQueue队列有很多种,这里主要列举常用的几种:

  1. ArrayBlockingQueue :是由数组实现的有界的阻塞队列,在初始化的时候,必须指定大小。
  2. LinkedBlockingQueue :是由链表实现的无界的阻塞队列,默认是Integer.MAX_VALUE,也可以初始化的时候指定大小。
  3. DelayQueue:延迟队列,只有延迟期满足才会从队列中获取元素。
  4. SynchronousQueue:是一个不存储元素的阻塞队列。若是插入的时候,已经有一个元素,就会阻塞等待,直到这个元素被移除,反之亦然。
  5. LinkedBlockingDeque:是一个由链表组成的双向阻塞队列。

下面我们来聊一聊一个线程池完整的过程。先来一张流程图,一个完成整的线程池执行的流程如下图所示:

(1)当核心线程数没有满时,就会创建核心线程数来执行任务:

(2)核心线程数满了,就会把任务放在任务队列里面:

(3)若是任务队列也满了,才会创建非核心线程数来执行任务,最后核心线程数+非核心线程数总和已经小于maximumPoolSize

(4)最后,如果线程数的总和已经达到了maximumPoolSize,就会走拒绝策略。

threadFactory:从名字来看是线程工厂,主要是给线程一个标识,比如:阿里规定使用线程池时,建议给线程池一个名字,方便追溯和排查:


    private static final ThreadPoolExecutor pool;

    static {
        // 定义线程池的名字
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
        pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
            threadFactory, new ThreadPoolExecutor.AbortPolicy());
        pool.allowCoreThreadTimeOut(true);
    }

handler:这个就是拒绝策略,有四种拒绝策略,如下:

  1. DiscardPolicy:直接丢弃任务,不做处理,不抛出异常,一般是对应无关紧要的任务。
  2. DiscardOldestPolicy:丢弃队列中最前面的任务,也就是最老的任务,然后尝试执行新任务。
  3. CallerRunsPolicy:由调用者线程进行处理。
  4. AbortPolicy:抛出异常。

具体的源码可以在ThreadPoolExecutor类中看到:

这里直接就抛出了一个异常,实现非常简单,详细的源码,可以参考上面的那个类,里面的代码不难:

默认线程池

那么为什么阿里明确规定,不适用默认的线程池,要使用自定的线程池,下面我们通过源码来分析分析。

默认的线程池,比较常见的主要有以下四种:

  1. Executors.newCachedThreadPool();
  2. Executors.newFixedThreadPool(n);
  3. Executors.newScheduledThreadPool(n);
  4. Executors.newSingleThreadExecutor();

这四个也是我们聊的最多的,面试也会经常被问到,具体源码可以查看java.util.concurrent.Executors,这个类里面还有其他的线程池,为了方面后面深入讲解这几个线程池的区别,我这里整理一下源码:

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

 public static ScheduledExecutorService newScheduledThreadPool(
             int corePoolSize, ThreadFactory threadFactory) {
         return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
     }
     
 public ScheduledThreadPoolExecutor(int corePoolSize,
                                        ThreadFactory threadFactory) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
               new DelayedWorkQueue(), threadFactory);
     }

从这几个线程池中可以看到,他们的底层都是调用自定义线程池ThreadPoolExecutor的构造方法实现的,只不过一些参数都已经自定义好了。

从源码中可以看到FixedThreadPoolSingleThreadPool,允许的缓存队列的长度都是Integer.MAX_VALUE,所以他就是存在队列无限长的问题,最终会导致OOM的异常,甚至导致资源耗尽。

CachedThreadPoolnewScheduledThreadPool允许的最大想线程数是Integer.MAX_VALUE,也就是他能无限的创建线程,这样也会导致资源耗尽或者出现OOM异常。

这个也是阿里为什么不推荐使用这几类默认的线程池的原因:

所以理由就是那么简单,直接从源码就能知道原因,所以推荐大家多看看ThreadPoolExecutor的源码。

线程池源码

线程池的源码中主要涉及到其他并发编程的类库有以下几种:ReentrantLock、AQS、AtomicInteger、CAS,其中ReentrantLock、AQS和CAS在我之前的文章中都写过对应的原创,在本号的这里可以找到:

下面就开始我们的源码之旅。首先,我们从最开始的execute执行方法一层一层往里面进行分析:

   public void execute(Runnable command) {
       // 任务为空,则抛出空指针,这里的command,也就是我们提交的任务
        if (command == null)  throw new NullPointerException();
        // 调用get获取ctl的值,ctl表示线程池运行状态和线程数,后面做详细的分析
        int c = ctl.get();
        // 第一步:每一个线程对应一个worker,workerCountOf也就是worker的数量,假如线程数小于corePoolSize(核心线程数)
        if (workerCountOf(c) < corePoolSize) {
            // 满足条件,则尝试创建核心线程执行任务,并且返回结束
            if (addWorker(commandtrue))
                return;
            c = ctl.get();
        }
        // 第二步:说明线程数大于核心线程数,检查是否运行以及添加到workQueue里面,workQueue是BlockingQueue<Runnable> 类型的,并且是在你new ThreadPoolExecutor()的时候进行初始化,通过this.workQueue = workQueue来初始化;
        if (isRunning(c) && workQueue.offer(command)) {
            // 来到这里说明上面的条件都满足,再次获取ctl的值
            int recheck = ctl.get();
            // 重新检查线程池状态,因为上次检测后线程池状态是否为RUNNING,因为有可能状态发生改变,如果非运行状态就从队列中移除任务并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 运行状态并且线程数是0,则创建线程
            else if (workerCountOf(recheck) == 0)
            // 线程数是0,则创建非核心线程,且不指定首次执行任务(firstTask参数为null),这里的第二个参数表示指核心线程还是非核心线程(true为核心),这个方法后面做详细的分析。
                addWorker(null, false);
        }
        // 第三步: 队列满了,尝试创建非核心线程执行任务,第二个参数表示核心和非核心(true表示核心)
        else if (!addWorker(commandfalse))
            // 失败了,就执行拒绝策略
            reject(command);
    }

详细的execute的源码上面的每一行都有注释,写的很清楚,其实这个execute源码就是对应这张图的执行流程:

源码中主要是执行三步逻辑,注释中我都给大家标了第一、二、三步骤了,大家可以结合这个流程图一起看,其实也挺简单的:

  1. 第一步:假如核心线程数还没有大于corePoolSize,就会创建核心线程来执行任务;每一个线程对应一个worker,workerCountOf也就是worker的数量。
  2. 第二步:线程数大于核心线程数,检查是否运行以及将任务添加到workQueue队列里面。后面还做了两个判断若是线程处于非运行状态,就从队列里面reomve掉,并且走拒绝策略;若是线程处于运行状态,并且线程数等于0,就调用addworker创建线程执行,但是当前任务传入为null,即不指定执行任务,因为已经已经将任务添加到队列里面了,由创建的线程从队列里面获取任务然后执行
  3. 第三步:队列满了,尝试创建非核心线程执行任务,第二个参数表示核心和非核心(true表示核心)

下面先来看看**reject(command)**这个方法的源码:

在这里插入图片描述
在这里插入图片描述

它的源码很简单,就是直接handler.rejectedExecution()方法,这个handler拒绝策略对象就是我们在new ThreadPoolExecutor() 的时候参数传进来的。

我们先来回顾一下具体的四种拒绝策略的方案,如下图所示:

这里分享一个小技巧,估计很多人也会知道,使用idea开发的同学,可以使用alt+ctl+鼠标左键就可以找到具体的实现类,而不是点进接口,因为很多情况下一个接口可能有很多的实现类,具体又不知道怎么找,就是使用这个快捷键来找:

  • 我们新来看看这个AbortPolicy策略,它的实现就是直接抛出异常,源码如下:

我们可以发现,拒绝策略的实现类是ThreadPoolExecutor的一个静态内部类的实现方式。

每一个具体的实现类是都实现了这个接口RejectedExecutionHandler,并重写它的rejectedExecution方法。

所以,这里可以发现,假如我们想自己实现自己的拒绝策略,就可以写一个策略类,并实现RejectedExecutionHandler这个接口,然后重写rejectedExecution方法。

其中rejectedExecution方法中有两个参数Runnable r和ThreadPoolExecutor e,其中Runnable 类型就是当前要执行的任务,在实现上随意你怎么自定义实现,而ThreadPoolExecutor 类型的就是当前的线程池对象

最后线程池在初始化的时候,传入我们自己的策略方式就可以实现自定义的拒绝策略

是不是感觉非常方便,这就是设计模式魅力,对扩展开放,对修改封闭,只要实现一个规定的接口,就能够按照自己的实现方式来。

  • 下面我们再来看看第二个CallerRunsPolicy策略的实现源码:

它是判断当前的任务是否被关闭,没有被关闭,就调用run方法,有调用者线程直接执行,并没有交给线程池来处理

  • 第三个拒绝策略DiscardOldestPolicy的源码实现:

它也是同样要判断任务是否被shutdown,然后是取出queue里面最前面的那个任务,也就是最老的任务,抛弃他,然后再次尝试执行execute方法,就这样重复调用

队列的一些常用api,大家可以参考如下图,方便大家阅读源码:

  • 最后一个DiscardPolicy策略,看看他的源码实现:

就一个空方法,啥也不干,简单明了。

上面的四种拒绝策略的源码就讲完了,还是非常简单的,下面我们来看看最重要的方法addWorker,它的含义是要创建线程来执行任务,包括创建核心线程或者非核心线程。

它的传参有两个Runnable firstTaskboolean core第一个参数表示新线程应该首先运行的任务,第二个就表示是否为核心(true为核心线程)

对于第一个参数firstTask,作者是这样解释的:

the task the new thread should run first (or null if none). Workers are created with an initial first task (in method execute()) to bypass queuing when there are fewer than corePoolSize threads (in which case we always start one), or when the queue is full (in which case we must bypass queue). Initially idle threads are usually created via prestartCoreThread or to replace other dying workers

翻译过来:新线程应该首先运行的任务(如果没有,则为null)。当线程少于corePoolSize时(在这种情况下,我们总是启动一个线程),或者当队列已满时(在这种情况下,我们必须绕过队列),将使用初始的第一个任务(在方法execute()中)创建工作线程以绕过队列。最初空闲线程通常是通过预启动CoreThread创建的,或用于替换其他正在死亡的工作线程。

按照我的理解就是一个任务是对应一个执行线程,要么是核心要么是非核心,当前任务就对应新线程的首要任务

下面我们来看看addWorker的源码:

// 若是创建线程成功则返回true
 private boolean addWorker(Runnable firstTask, boolean core) {
        // 做一个标记,相当于goto语句
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取线程池运行时的状态
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&  firstTask == null &&! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 根据传入的core,若是core(核心线程)则不能超过corePoolSize ,非核心则不能超过maximumPoolSize
                if (wc >= CAPACITY ||  wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过CAS的方式将线程数+1,设置成功,退出循环,执行后面的逻辑
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); 
                // 若是线程池状态改变,退回retry重新执行
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        // 表示线程是否启动成功,返回的标识
        boolean workerStarted = false;
        // 表示线程是否添加成功,即添加到HashSet中
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 初始化worker,执行new worker的时候,就会通过threadFactory生成生成一个新的线程。
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 获取锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 比较当前线程池的状态,若是小于SHUTDOWN ,实际上就是只有处于RUNNING或者(等于SHUTDOWN 与firstTask 为空的时候)满足条件
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                       // 检查线程是否已经启动,按理来说新创建的线程,还没有执行start方法说明还没启动的。
                        if (t.isAlive())   throw new IllegalThreadStateException();
                        // 这个workers实际上是一个HashSet结构,把线程存入HashSet   中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)  largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 开启线程执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 线程启动失败
            if (! workerStarted)  addWorkerFailed(w);
        }
        return workerStarted;
    }

上面涉及到一个线程池的状态,通过runStateOf方法获取,下面我们来看看实际源码中它的状态是怎么样的:

在源码中可以看到线程池的状态一共有五种,并且在注释中分别解释了每一种状态到另一种状态变化的分别调用了什么方法,上面都已经详细帮你写好注释了。

从上面的源码中可以看到,主要是做了以下几件事:

  1. 通过core确定核心数或者非核心数。
  2. 创建worker,并通过threadFactory创建新的线程。
  3. 把新线程添加到workers里面。
  4. 最后启动线程。

其中通过ReentrantLock来保证线程安全,并且通过Worker来包装创建的线程:

 private final class Worker extends AbstractQueuedSynchronizer  implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;
        // 初始化worker
        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            // 通过getThreadFactory也就是获取new ThreadPoolExecutor()时传入的threadFactory来创建新的线程。
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
        
        // 当前线程是否已经被别人持有
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        
        // 尝试获取线程
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        // 释放线程
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        // 中断线程
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

这个Worker类继承了AbstractQueuedSynchronizer(AQS)以及实现了Runnable接口,所以AQS(独占锁)的功能它都具有,通过AQS里面的status表示当前线程是否在运行,以及使用isHeldExclusively(也就status状态值的维护),判断独占状态,说明线程正在执行任务,如果是非独占状态,表明线程是空闲的

代码都很简单,其中比较重要的就是run方法,在run方法里面调用了runWorker(this) 方法,我们详细来看看 runWorker是个什么鬼:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 取出Worker里面的任务,有可能为空
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            // task不为空或者getTask能够从队列中取出任务
            while (task != null || (task = getTask()) != null) {
                // 上锁
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&  runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行任务前执行的方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务后执行的方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           // 超时没有取到任务,则回收空闲超时的线程
            processWorkerExit(w, completedAbruptly);
        }
    }

从上面的源码中可以看出runWorker主要做这几件事:

  1. 取出worker里面的task或者不断的循环getTask取出队列里面的任务。
  2. 最后就是执行任务。

我们来研究一下getTask的源码:

 private Runnable getTask() {
        boolean timedOut = false;
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            // allowCoreThreadTimeOut 表示开启核心线程超过空闲时间进行回收,默认是不回收,可以通过方法设置为true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) return null;
                continue;
            }
            
            try {
                // 从workQueue里面取出任务
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)  return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

它的最主要的核心代码就是这句workQueue.take(),通过for死循环从workQueue里面取出任务,然后返回。就这一个最重要的功能。

到这里基本线程池的源码都已经讲完了,都过了一遍,大家可以参考我的再详细自己看一遍源码。

线程数的配置

面试官:你知道线程池中线程数怎么进行设置吗?我相信很多开发人员在面试的时候都有被问到过,我之前看很多技术号主都有写,这块的内容。

在实际的开发中,设置合理的线程数参数,又可以参考的公式,主要是分为IO密集型CPU密集型两种:

  1. CPU密集型任务:因为CPU被频繁的占用,所以对于这类的设置,线程数应该尽量少,参考的公式为:Ncpu+1,这个Ncpue可以通过Runtime.getRuntime().availableProcessors()来获取,表示CPU的物理数。
  2. IO密集型任务:因为不是一直执行任务,所以对于这种可以设置大一些的线程数,参考公式为 2*Ncpu

但是,我个人认为这种公式没有最适合的,我们一般是结合压测来进行设置,先预先设置一个比较大的线程数,然后进行压测,使用监控狗监控CPU和内存的变化来修改线程数

线程池设计模式

在线程池中比较明显的设计模式就一个是策略模式(拒绝策略),还有就是工厂模式threadFactory、以及生产者-消费者模式。

策略模式:这个实现主要是通过策略接口RejectedExecutionHandler接口,并且不同的策略重写rejectedExecution方法,也可以自定义实现RejectedExecutionHandler接口,进行自定义策略的实现。

工厂模式threadFactory:这个就太简单的,这里就不想细说了。

生产者-消费者模式:生产者就是生产任务,通过execute方法创建新的线程以及添加新的任务:

 public void execute(Runnable command) {
        ...
        if (isRunning(c) && workQueue.offer(command)) { isRunning()
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        ...
    }

而消费者通过getTask不断的循环从队列里面取出任务来进行消费,其中任务队列就是作为生产者和消费者的中间媒介,生产者往队列里面塞任务,消费者从队列里面消费任务:

private Runnable getTask() {
        for (;;) {
            ...
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            ...
        }
    }

线程池面试

最后,大概讲一下线程池的面试,不过基本都已经在上面的文章中都有体现了,主要有以下几类面试问题:

  1. 说说你对线程池的了解?
  2. 你用过线程池吗?用到哪些场景?
  3. 线程池的运行原理你知道吗?
  4. 线程池的拒绝策略了解过吗?
  5. 在实际使用线程池的过程中线程数时怎么设置的?
  6. 有实际的线程池调优的经验吗?

其实,大概就是这几类,基本上面都有讲解,调优经验无非就是让线程池运行更加合理,包括线程数的设置,拒绝策略的选择,以及对线程池业务的拆分。

因为有些项目基本就是一个线程池,所以就会暴露出一些问题,比如一个线程池中,任务快的、慢的都叠在一起,慢的严重影响线程池的效率。

所以,一般建议可以按照不同的业务进行拆分成不同的线程池。

好了,今天的完整版线程池的讲解就到这里,我是黎杜,我们下一期见,假如觉得文章对你有帮助的,欢迎点个赞或者三连一下呀(无限感激,阿里嘎多)

黎杜

2021/08/25  阅读:37  主题:默认主题

作者介绍

黎杜