Loading...
墨滴

楼仔

2021/05/21  阅读:27  主题:橙心

【Java并发编程系列7】线程池基本知识

主要讲解Java线程池的基础知识。

前言

目前书籍《Java并发编程实战》看到“第7章:取消与关闭”,里面涉及到部分线程池的内容,然后第8章就是线程池,所以打算把之前看的线程池的资料再整理一下,便于后面能更好理解书中的内容。

之前看过一篇博客,关于线程池的内容讲解的非常好,我只截取基础知识部分,把Java基础内容全部掌握后,再对里面的原理部分进行深入理解,后面会附上该篇博客的链接。

初识线程池

我们知道,线程的创建和销毁都需要映射到操作系统,因此其代价是比较高昂的。出于避免频繁创建、销毁线程以及方便线程管理的需要,线程池应运而生。

线程池优势

  • 降低资源消耗:线程池通常会维护一些线程(数量为 corePoolSize),这些线程被重复使用来执行不同的任务,任务完成后不会销毁。在待处理任务量很大的时候,通过对线程资源的复用,避免了线程的频繁创建与销毁,从而降低了系统资源消耗。
  • 提高响应速度:由于线程池维护了一批 alive 状态的线程,当任务到达时,不需要再创建线程,而是直接由这些线程去执行任务,从而减少了任务的等待时间。
  • 提高线程的可管理性:使用线程池可以对线程进行统一的分配,调优和监控。

线程池设计思路

有句话叫做艺术来源于生活,编程语言也是如此,很多设计思想能映射到日常生活中,比如面向对象思想、封装、继承,等等。今天我们要说的线程池,它同样可以在现实世界找到对应的实体——工厂。先假想一个工厂的生产流程:

工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也以招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。我们做如下一番映射:

  • 工厂——线程池
  • 订单——任务(Runnable)
  • 正式工人——核心线程
  • 临时工——普通线程
  • 仓库——任务队列
  • 调度员——getTask()

getTask()是一个方法,将任务队列中的任务调度给空闲线程。

映射后,形成线程池流程图如下,两者是不是有异曲同工之妙?

点评一下:感觉作者的这个类比,太TM经典了!!!直接抓住了线程调用的精髓。这就是为什么看书时不能只盯着书看,可能你看半天都不懂书中讲的啥,找一篇经典的博客,瞬间给你拨开云雾,而且还印象深刻。

这样,线程池的工作原理或者说流程就很好理解了,提炼成一个简图:

深入线程池

那么接下来,问题来了,线程池是具体如何实现这套工作机制的呢?从Java线程池Executor框架体系可以看出:线程池的真正实现类是ThreadPoolExecutor,因此我们接下来重点研究这个类。

构造方法

研究一个类,先从它的构造方法开始。ThreadPoolExecutor提供了4个有参构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
 
{
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)
 
{
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
 
{
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
 
{
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

感觉构造方法有点多,其实前面的基础参数都是一样的,就后面两个可选参数“线程工厂”和“拒绝策略”不一样,组合一下就是2*2=4种情况。

解释一下构造方法中涉及到的参数:

  • corePoolSize(必需):核心线程数。即池中一直保持存活的线程数,即使这些线程处于空闲。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。
  • maximumPoolSize(必需):池中允许的最大线程数。当核心线程全部繁忙且任务队列打满之后,线程池会临时追加线程,直到总线程数达到maximumPoolSize这个上限。
  • keepAliveTime(必需):线程空闲超时时间。当非核心线程处于空闲状态的时间超过这个时间后,该线程将被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。
  • unit(必需):keepAliveTime参数的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)
  • workQueue(必需):任务队列,采用阻塞队列实现。当核心线程全部繁忙时,后续由execute方法提交的Runnable将存放在任务队列中,等待被线程处理。
  • threadFactory(可选):线程工厂。指定线程池创建线程的方式。
  • handler(可选):拒绝策略。当线程池中线程数达到maximumPoolSize且workQueue打满时,后续提交的任务将被拒绝,handler可以指定用什么方式拒绝任务。

了解完“线程的设计思路”,再看构造方法的这些参数,感觉就非常容易懂了。

任务队列

使用ThreadPoolExecutor需要指定一个实现了BlockingQueue接口的任务等待队列。在ThreadPoolExecutor线程池的API文档中,一共推荐了三种等待队列,它们是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;

  1. SynchronousQueue:同步队列。这是一个内部没有任何容量的阻塞队列,任何一次插入操作的元素都要等待相对的删除/读取操作,否则进行插入操作的线程就要一直等待,反之亦然。
  2. LinkedBlockingQueue:无界队列(严格来说并非无界,上限是Integer.MAX_VALUE),基于链表结构。使用无界队列后,当核心线程都繁忙时,后续任务可以无限加入队列,因此线程池中线程数不会超过核心线程数。这种队列可以提高线程池吞吐量,但代价是牺牲内存空间,甚至会导致内存溢出。另外,使用它时可以指定容量,这样它也就是一种有界队列了。
  3. ArrayBlockingQueue:有界队列,基于数组实现。在线程池初始化时,指定队列的容量,后续无法再调整。这种有界队列有利于防止资源耗尽,但可能更难调整和控制。

另外,Java还提供了另外4种队列:

  1. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。存放在PriorityBlockingQueue中的元素必须实现Comparable接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue不会保证优先级一样的元素的排序,也不保证当前队列中除了优先级最高的元素以外的元素,随时处于正确排序的位置。
  2. DelayQueue:延迟队列。基于二叉堆实现,同时具备:无界队列、阻塞队列、优先队列的特征。DelayQueue延迟队列中存放的对象,必须是实现Delayed接口的类对象。通过执行时延从队列中提取任务,时间没到任务取不出来。更多内容请见DelayQueue。
  3. LinkedBlockingDeque:双端队列。基于链表实现,既可以从尾部插入/取出元素,还可以从头部插入元素/取出元素。
  4. LinkedTransferQueue:由链表结构组成的无界阻塞队列。这个队列比较特别的时,采用一种预占模式,意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素。

感觉这提供的队列有些多啊,总共7个!说实话,我现在还不知道实际场景用哪个比较好,后面我们可以看看Java封装好的线程池,里面用的队列都是哪种。

拒绝策略

线程池有一个重要的机制:拒绝策略。当线程池workQueue已满且无法再创建新线程池时,就要拒绝后续任务了。拒绝策略需要实现RejectedExecutionHandler接口,不过Executors框架已经为我们实现了4种拒绝策略:

  1. AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
  2. CallerRunsPolicy:直接运行这个任务的run方法,但并非是由线程池的线程处理,而是交由任务的调用线程处理。
  3. DiscardPolicy:直接丢弃任务,不抛出任何异常。
  4. DiscardOldestPolicy:将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行。

线程工厂指定创建线程的方式,这个参数不是必选项,Executors类已经为我们非常贴心地提供了一个默认的线程工厂:

/**
 * The default thread factory
 */

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

线程池状态

线程池有5种状态:

volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

runState表示当前线程池的状态,它是一个 volatile 变量用来保证线程之间的可见性。

下面的几个static final变量表示runState可能的几个取值,有以下几个状态:

  • RUNNING:当创建线程池后,初始时,线程池处于RUNNING状态;
  • SHUTDOWN:如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
  • STOP:如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
  • TERMINATED:当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

初始化&容量调整&关闭

线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程
  • prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程,并返回初始化的线程数
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
        ++n;
    return n;
}

线程池关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池容量调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小 当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

使用线程池

ThreadPoolExecutor

通过构造方法使用ThreadPoolExecutor是线程池最直接的使用方式,下面看一个实例:

public class ThreadPoolExecutorTest {
    public static void main(String args[]) {
        // 创建线程池(核心线程数是3,最大线程数是5,超时时间是5秒)
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        // 向线程池提交任务
        for (int i = 0; i < threadPool.getCorePoolSize(); i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 2; j++) {
                        System.out.println(Thread.currentThread().getName() + ":" + j);
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        // 关闭线程池
        threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
        // threadPool.shutdownNow(); // 设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
    }
}
// 输出:
// pool-1-thread-1:0
// pool-1-thread-3:0
// pool-1-thread-2:0
// pool-1-thread-2:1
// pool-1-thread-1:1
// pool-1-thread-3:1

Executors封装线程池

另外,Executors封装好了4种常见的功能线程池(还是那么地贴心):

FixedThreadPool

固定容量线程池。其特点是最大线程数就是核心线程数,意味着线程池只能创建核心线程,keepAliveTime为0,即线程执行完任务立即回收。任务队列未指定容量,代表使用默认值Integer.MAX_VALUE。适用于需要控制并发线程的场景。

// 使用默认线程工厂
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 需要自定义线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

使用示例:

public class FixedThreadPoolTest {
    public static void main(String args[]) {
        // 创建线程池对象,设置核心线程和最大线程数为5
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        fixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " is running.");
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("Throw Exception.");
                }
                System.out.println(Thread.currentThread().getName() + " after sleep, is still running.");
            }
        });
        //fixedThreadPool.shutdown();
        fixedThreadPool.shutdownNow(); // 不建议这样使用,很危险,这里仅用于测试
    }
}
// 输出:
// pool-1-thread-1 is running.
// Throw Exception.
// pool-1-thread-1 after sleep, is still running.
// java.lang.InterruptedException: sleep interrupted
//  at java.lang.Thread.sleep(Native Method)
//  at com.java.parallel.pool.FixedThreadPoolTest$1.run(FixedThreadPoolTest.java:15)
//  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
//  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
//  at java.lang.Thread.run(Thread.java:748)

作者的这个示例,我加了一点料,我就想看看直接中断会是什么效果,结果发现直接中断后,线程直接抛出异常,我捕获异常后,输出了一些结果。正常情况下,捕获异常是需要做一些处理,我这里仅作测试。

SingleThreadExecutor

单线程线程池。特点是线程池中只有一个线程(核心线程),线程执行完任务立即回收,使用有界阻塞队列(容量未指定,使用默认值Integer.MAX_VALUE)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(11,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
// 为节省篇幅,省略了自定义线程工厂方式的源码

使用示例:

public class SingleThreadExecutorTest {
    public static void main(String args[]) {
        // 创建线程池对象,设置核心线程和最大线程数为1
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        singleThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " is running.");
            }
        });
        singleThreadPool.shutdown();
    }
}
// 输出:
// pool-1-thread-1 is running.

ScheduledThreadPool

定时线程池。指定核心线程数量,普通线程数量无限,线程执行完任务立即回收,任务队列为延时阻塞队列。这是一个比较特别的线程池,适用于执行定时或周期性的任务。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// 继承了 ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService 
{
    // 构造函数,省略了自定义线程工厂的构造函数
 public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
           new DelayedWorkQueue());
 }
 
 // 延时执行任务
 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        ...
    }
 // 定时执行任务
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
}

使用示例:

public class ScheduledThreadPoolTest {
    public static void main(String args[]) {
        // 创建定时线程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        // 向线程池提交任务
        scheduledThreadPool.schedule(new Runnable(){
            public void run() {
                System.out.println(Thread.currentThread().getName() + "--->运行");
            }
        }, 5, TimeUnit.SECONDS); // 延迟5s后执行任务
        scheduledThreadPool.shutdown();
    }
}
// 输出:
// pool-1-thread-1--->运行

CachedThreadPool

缓存线程池。没有核心线程,普通线程数量为Integer.MAX_VALUE(可以理解为无限),线程闲置60s后回收,任务队列使用SynchronousQueue这种无容量的同步队列。适用于任务量大但耗时低的场景。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

使用示例:

public class CachedThreadPoolTest {
    public static void main(String args[]) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable(){
            public void run() {
                System.out.println(Thread.currentThread().getName() + "--->运行");
            }
        });
        cachedThreadPool.shutdown();
    }
}
// 输出:
// pool-1-thread-1--->运行

最后总结一下Executors封装线程池,每种方式用的是哪种队列:

  • 固定容量线程池FixedThreadPool:LinkedBlockingQueue无界队列
  • 单线程线程池SingleThreadExecutor:LinkedBlockingQueue无界队列
  • 定时线程池ScheduledThreadPool:DelayQueue延迟队列
  • 缓存线程池CachedThreadPool:SynchronousQueue同步队列

稍微解读一下:

  • FixedThreadPool:构造函数的keepAliveTime=0,然后核心线程个数和最大线程个数都限制死了,所以虽然用的是LinkedBlockingQueue无界队列,但是其实不会用到无界的特性,其实是当有界来用。
  • SingleThreadExecutor:核心线程和最大线程都是1,keepAliveTime=0,这个也是拿LinkedBlockingQueue无界队列当有界队列使用。
  • ScheduledThreadPool:因为需要延时,这个使用DelayQueue延迟队列就没有任何毛病。
  • CachedThreadPool:没有核心线程,普通线程设置成最大值,用的SynchronousQueue没有任何容量,这个不太理解,后面再研究一下。

总结

这篇文章其实是我刚开始接触线程池时找到了,感觉非常经典,刚好现在需要系统学习Java并发编程,就把这篇文章重新整理一下,然后有些地方加入自己的解读,希望对大家有所帮助。

欢迎大家多多点赞,更多文章,请关注微信公众号“楼仔进阶之路”,点关注,不迷路~~

楼仔

2021/05/21  阅读:27  主题:橙心

作者介绍

楼仔