17 | Executor组件:Tomcat如何扩展Java线程池?

在开发中我们经常会碰到“池”的概念,比如数据库连接池、内存池、线程池、常量池等。为什么需要“池”呢?程序运行的本质,就是通过使用系统资源(CPU、内存、网络、磁盘等)来完成信息的处理,比如在JVM中创建一个对象实例需要消耗CPU和内存资源,如果你的程序需要频繁创建大量的对象,并且这些对象的存活时间短,就意味着需要进行频繁销毁,那么很有可能这部分代码会成为性能的瓶颈。

而“池”就是用来解决这个问题的,简单来说,对象池就是把用过的对象保存起来,等下一次需要这种对象的时候,直接从对象池中拿出来重复使用,避免频繁地创建和销毁。在Java中万物皆对象,线程也是一个对象,Java线程是对操作系统线程的封装,创建Java线程也需要消耗系统资源,因此就有了线程池。JDK中提供了线程池的默认实现,我们也可以通过扩展Java原生线程池来实现自己的线程池。

同样,为了提高处理能力和并发度,Web容器一般会把处理请求的工作放到线程池里来执行,Tomcat扩展了原生的Java线程池,来满足Web容器高并发的需求,下面我们就来学习一下Java线程池的原理,以及Tomcat是如何扩展Java线程池的。

Java线程池

简单的说,Java线程池里内部维护一个线程数组和一个任务队列,当任务处理不过来的时,就把任务放到队列里慢慢处理。

ThreadPoolExecutor

我们先来看看Java线程池核心类ThreadPoolExecutor的构造函数,你需要知道ThreadPoolExecutor是如何使用这些参数的,这是理解Java线程工作原理的关键。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

每次提交任务时,如果线程数还没达到核心线程数corePoolSize,线程池就创建新线程来执行。当线程数达到corePoolSize后,新增的任务就放到工作队列workQueue里,而线程池中的线程则努力地从workQueue里拉活来干,也就是调用poll方法来获取任务。

如果任务很多,并且workQueue是个有界队列,队列可能会满,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数maximumPoolSize,则不能再创建新的临时线程了,转而执行拒绝策略handler,比如抛出异常或者由调用者线程来执行任务等。

如果高峰过去了,线程池比较闲了怎么办?临时线程使用poll(keepAliveTime, unit)方法从工作队列中拉活干,请注意poll方法设置了超时时间,如果超时了仍然两手空空没拉到活,表明它太闲了,这个线程会被销毁回收。

那还有一个参数threadFactory是用来做什么的呢?通过它你可以扩展原生的线程工厂,比如给创建出来的线程取个有意义的名字。

FixedThreadPool/CachedThreadPool

Java提供了一些默认的线程池实现,比如FixedThreadPool和CachedThreadPool,它们的本质就是给ThreadPoolExecutor设置了不同的参数,是定制版的ThreadPoolExecutor。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从上面的代码你可以看到:

  • FixedThreadPool有固定长度(nThreads)的线程数组,忙不过来时会把任务放到无限长的队列里,这是因为LinkedBlockingQueue默认是一个无界队列
  • CachedThreadPool的maximumPoolSize参数值是Integer.MAX_VALUE,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是SynchronousQueue,表明队列长度为0。

Tomcat线程池

跟FixedThreadPool/CachedThreadPool一样,Tomcat的线程池也是一个定制版的ThreadPoolExecutor。

定制版的ThreadPoolExecutor

通过比较FixedThreadPool和CachedThreadPool,我们发现它们传给ThreadPoolExecutor的参数有两个关键点:

  • 是否限制线程个数。
  • 是否限制队列长度。

对于Tomcat来说,这两个资源都需要限制,也就是说要对高并发进行控制,否则CPU和内存有资源耗尽的风险。因此Tomcat传入的参数是这样的:

//定制版的任务队列
taskqueue = new TaskQueue(maxQueueSize);

//定制版的线程工厂
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());

//定制版的线程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);

你可以看到其中的两个关键点:

  • Tomcat有自己的定制版任务队列和线程工厂,并且可以限制任务队列的长度,它的最大长度是maxQueueSize。
  • Tomcat对线程数也有限制,设置了核心线程数(minSpareThreads)和最大线程池数(maxThreads)。

除了资源限制以外,Tomcat线程池还定制自己的任务处理流程。我们知道Java原生线程池的任务处理逻辑比较简单:

  1. 前corePoolSize个任务时,来一个任务就创建一个新线程。
  2. 后面再来任务,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到maximumPoolSize,执行拒绝策略。

Tomcat线程池扩展了原生的ThreadPoolExecutor,通过重写execute方法实现了自己的任务处理逻辑:

  1. 前corePoolSize个任务时,来一个任务就创建一个新线程。
  2. 再来任务的话,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到maximumPoolSize,则继续尝试把任务添加到任务队列中去。
  4. 如果缓冲队列也满了,插入失败,执行拒绝策略。

观察Tomcat线程池和Java原生线程池的区别,其实就是在第3步,Tomcat在线程总数达到最大数时,不是立即执行拒绝策略,而是再尝试向任务队列添加任务,添加失败后再执行拒绝策略。那具体如何实现呢,其实很简单,我们来看一下Tomcat线程池的execute方法的核心代码。

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
  
  ...
  
  public void execute(Runnable command, long timeout, TimeUnit unit) {
      submittedCount.incrementAndGet();
      try {
          //调用Java原生线程池的execute去执行任务
          super.execute(command);
      } catch (RejectedExecutionException rx) {
         //如果总线程数达到maximumPoolSize,Java原生线程池执行拒绝策略
          if (super.getQueue() instanceof TaskQueue) {
              final TaskQueue queue = (TaskQueue)super.getQueue();
              try {
                  //继续尝试把任务放到任务队列中去
                  if (!queue.force(command, timeout, unit)) {
                      submittedCount.decrementAndGet();
                      //如果缓冲队列也满了,插入失败,执行拒绝策略。
                      throw new RejectedExecutionException("...");
                  }
              } 
          }
      }
}

从这个方法你可以看到,Tomcat线程池的execute方法会调用Java原生线程池的execute去执行任务,如果总线程数达到maximumPoolSize,Java原生线程池的execute方法会抛出RejectedExecutionException异常,但是这个异常会被Tomcat线程池的execute方法捕获到,并继续尝试把这个任务放到任务队列中去;如果任务队列也满了,再执行拒绝策略。

定制版的任务队列

细心的你有没有发现,在Tomcat线程池的execute方法最开始有这么一行:

submittedCount.incrementAndGet();

这行代码的意思把submittedCount这个原子变量加一,并且在任务执行失败,抛出拒绝异常时,将这个原子变量减一:

submittedCount.decrementAndGet();

其实Tomcat线程池是用这个变量submittedCount来维护已经提交到了线程池,但是还没有执行完的任务个数。Tomcat为什么要维护这个变量呢?这跟Tomcat的定制版的任务队列有关。Tomcat的任务队列TaskQueue扩展了Java中的LinkedBlockingQueue,我们知道LinkedBlockingQueue默认情况下长度是没有限制的,除非给它一个capacity。因此Tomcat给了它一个capacity,TaskQueue的构造函数中有个整型的参数capacity,TaskQueue将capacity传给父类LinkedBlockingQueue的构造函数。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

  public TaskQueue(int capacity) {
      super(capacity);
  }
  ...
}

这个capacity参数是通过Tomcat的maxQueueSize参数来设置的,但问题是默认情况下maxQueueSize的值是Integer.MAX_VALUE,等于没有限制,这样就带来一个问题:当前线程数达到核心线程数之后,再来任务的话线程池会把任务添加到任务队列,并且总是会成功,这样永远不会有机会创建新线程了。

为了解决这个问题,TaskQueue重写了LinkedBlockingQueue的offer方法,在合适的时机返回false,返回false表示任务添加失败,这时线程池会创建新的线程。那什么是合适的时机呢?请看下面offer方法的核心源码:

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

  ...
   @Override
  //线程池调用任务队列的方法时,当前线程数肯定已经大于核心线程数了
  public boolean offer(Runnable o) {

      //如果线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
      if (parent.getPoolSize() == parent.getMaximumPoolSize()) 
          return super.offer(o);
          
      //执行到这里,表明当前线程数大于核心线程数,并且小于最大线程数。
      //表明是可以创建新线程的,那到底要不要创建呢?分两种情况:
      
      //1. 如果已提交的任务数小于当前线程数,表示还有空闲线程,无需创建新线程
      if (parent.getSubmittedCount()<=(parent.getPoolSize())) 
          return super.offer(o);
          
      //2. 如果已提交的任务数大于当前线程数,线程不够用了,返回false去创建新线程
      if (parent.getPoolSize()<parent.getMaximumPoolSize()) 
          return false;
          
      //默认情况下总是把任务添加到任务队列
      return super.offer(o);
  }
  
}

从上面的代码我们看到,只有当前线程数大于核心线程数、小于最大线程数,并且已提交的任务个数大于当前线程数时,也就是说线程不够用了,但是线程数又没达到极限,才会去创建新的线程。这就是为什么Tomcat需要维护已提交任务数这个变量,它的目的就是在任务队列的长度无限制的情况下,让线程池有机会创建新的线程

当然默认情况下Tomcat的任务队列是没有限制的,你可以通过设置maxQueueSize参数来限制任务队列的长度。

本期精华

池化的目的是为了避免频繁地创建和销毁对象,减少对系统资源的消耗。Java提供了默认的线程池实现,我们也可以扩展Java原生的线程池来实现定制自己的线程池,Tomcat就是这么做的。Tomcat扩展了Java线程池的核心类ThreadPoolExecutor,并重写了它的execute方法,定制了自己的任务处理流程。同时Tomcat还实现了定制版的任务队列,重写了offer方法,使得在任务队列长度无限制的情况下,线程池仍然有机会创建新的线程。

课后思考

请你再仔细看看Tomcat的定制版任务队列TaskQueue的offer方法,它多次调用了getPoolSize方法,但是这个方法是有锁的,锁会引起线程上下文切换而损耗性能,请问这段代码可以如何优化呢?

不知道今天的内容你消化得如何?如果还有疑问,请大胆的在留言区提问,也欢迎你把你的课后思考和心得记录下来,与我和其他同学一起讨论。如果你觉得今天有所收获,欢迎你把它分享给你的朋友。

精选留言

  • 永光

    2019-06-18 11:52:44

    观察 Tomcat 线程池和 Java 原生线程池的区别,其实就是在第 3 步,Tomcat 在线程总数达到最大数时,不是立即执行拒绝策略,而是再尝试向任务队列添加任务,添加失败后再执行拒绝策略。
    问题:
    感觉这两种方式都一样呀,前corePoolSize都是直接创建线程来处理。后续都是先放在队列里面,满了在创建临时线程来处理。 Tomcat线程池,在达到max时 再次检测,并尝试插入队列有什么意义呢?我理解再次检测队列也是满的呀?
    2、
    作者回复

    有可能第一次尝试放队列是满的,失败,再尝试创建临时线程,也满了,但是这个过程中,队列中的任务可能被临时线程消费了一部分,再往队列中送可能会成功。

    2019-06-18 23:55:10

  • 微思

    2019-06-19 19:01:34

    给李老师点赞👍解析得非常到位!
  • ℡ㄨ和尚ふ

    2021-10-16 22:36:29

    我的理解是:比如核心线程数为4,总线程数为10个。通过execute方法提交个4个任务,消耗了4个核心线程,所以此时的getPoolSise得到的就是4个线程数。
    当第5个任务调教到线程池中时,因为已经创建了4个核心的线程,此时会尝试放入队列taskQueue中
    条件1:parent.getPoolSize() == parent.getMaximumPoolSize() -》 4 == 10不成立
    条件2:parent.getSubmittedCount()<=(parent.getPoolSize()) -》 5 <= 4 不成立
    条件3:parent.getPoolSize()<parent.getMaximumPoolSize() -》 4 < 10 成立
    可以理解为此时线程池中只有4个线程,但是任务有5个,如果四个线程统统阻塞在自己的任务上的话,第5个任务是迟迟得不到执行的,也就是说5个任务超出了4个线程的处理能力,而且此时没有超出最大线程数限制,所以这里可以理解为任务队列的容量为0,创建一个新的线程进行处理。
    此时当前线程数有了5个。
    当第6个任务进来的时候,如果前5个线程都阻塞在自己的任务上的话,之后的分析过程和前面类似。
    但是假设5个线程中有一个任务已经执行完毕了,那么此时线程池中的未完成任务数为5,线程数也为5,就表示有一个线程是空闲的,那么 5 <= 5 满足条件2,就将当前第5个任务加入到任务队列当中,由空闲线程从任务队列中取出进行执行。
    我的理解是taskQueue队列的容量是动态变化的,取决于当前线程池中的空闲的线程数。但是当已创建线程数已经等于最大限制线程数的时候,任务队列就退化成了无界队列,这样来讲的话,默认情况下感觉拒绝策略是没有机会执行的(可以通过设置 maxQueueSize 参数来限制任务队列的长度,这样就可以执行拒绝策略了),不知道理解的对不对
  • 世纪猛男

    2019-06-18 23:55:37

    关于今日的思考题 getPoolSize. 用Volatile去修饰一个变量不可行,因为变更过程,会基于之前的pool size,无法做到原子操作。 用atomic 也不合适 并发量高的时候 会导致 大量的更新失败, 持续消耗CPU。 所以还不如加锁来的痛快。 请教老师的想法
    作者回复

    可以加锁,但是没有必要多次调用,调一次把结果存起来就行。

    2019-06-20 21:32:19

  • 吴大山

    2019-10-14 20:26:05

    Tomcat 线程池扩展了原生的 ThreadPoolExecutor,通过重写 execute 方法实现了自己的任务处理逻辑:
    1. xxx
    2. 再来任务的话,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
    3. xxx
    4. xxx

    我细看了一下第二步代码:
    方法定位:org.apache.tomcat.util.threads.TaskQueue#offer
    逻辑定位:if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;

    这么看逻辑好像是:再来任务的话,如果线程数少于maximumPoolSize时,都会优先使用线程,而不会入队
  • 吃饭饭

    2019-07-17 08:33:24

    老师,TaskQueue 重写了 offer 方法的关键是什么?是 TaskQueue(int capacity) ,只是把无界变有界了吗?每台看明白 offer 具体的改变是什么
    作者回复

    offer方法返回false表示添加失败,添加失败就会创建新线程。

    TaskQueue的父类总是返回true,但是TaskQueue就不会总是返回true了,可能是false,区别在这里。

    2019-07-19 23:38:34

  • 迎风劲草

    2019-06-18 22:27:48

    老师,核心线程如果超过keeplive时间,是否也会回收?还有如果我的队列中还有等待执行的runable,这时候kill 进程,时候需要等到所有runable被执行要,进程才结束吗?
    作者回复

    1.可以调用ThreadPoolExecutor的这个方法来指定是否回收核心线程:
    public void allowCoreThreadTimeOut(boolean value)

    2.kill进程会立即退出,内核会负责清理这个进程的所有资源。

    2019-06-18 23:07:52

  • 2021-11-01 17:24:31

    平时开发的Web系统通常都有大量的 IO 操作,比方说查询数据库、查询缓存等等。任务在执行 IO 操作的时候 CPU就空闲了下来,这时如果增加执行任务的线程数而不是把任务暂存在队列中,就可以在单位时间内执行更多的任务,大大提高了任务执行的吞吐量。
    Tomcat 使用的线程池就不是 JDK 原生的线程池,而是做了一些改造,当线程数超过 coreThreadCount 之后会优先创建线程,直到线程数到达 maxThreadCount,这样就比较适合于 Web 系统大量 IO 操作的场景了
    --摘自《高并发系统40问》
  • 13963865700

    2019-06-26 16:49:43

    老师,您好,请问:
    1.Tomcat在默认队列长度无限制的情况下,是不是不会触发拒绝策略,即使线程数达到maxQueueSize也一直把任务放队列中?
    2.这种情况会不会拖垮Tomcat,发生内存溢出?

    作者回复

    为了解决这个问题,Tomcat的定制版任务队列TaskQueue 重写了 LinkedBlockingQueue 的 offer 方法,在合适的时机返回 false,返回 false 表示任务添加失败,这时线程池会创建新的线程。

    2019-06-27 14:50:40

  • -W.LI-

    2019-06-20 00:35:04

    李老师好。我有个问题,原生队列是在队列满时新建线程处理。然后当线程达到最大线程数的时候,不就是队列已满,线程也开满了么。Tomcat补获异常后再往队列里放一次,只是为了做后的努力争取不丢任务么?
    作者回复

    对的

    2019-06-20 22:13:53

  • Standly

    2019-06-18 23:19:09

    感觉直接读workers.size()就可以了么,因为创建线程和销毁线程的方法都加锁了,而且是同一把锁,不懂为啥getPoolSize()方法还要额外加锁?
    作者回复

    是的,这个地方Tomcat的实现可以简化。

    2019-06-19 00:11:02

  • HARDMAN

    2019-07-06 12:16:54

    请教老师,如果线程池已满,任务队列也满了,那么tomcat会拒绝后面的请求,这时如何进一步增强tomcat的处理能力,让它能同时处理更多请求呢?
    作者回复

    只能提高web应用的处理速度,否则只能排队,可以把队列长度加大,不推荐这样做,因为客户端需要等较长时间,个人感觉还不如直接返回错误

    2019-07-06 22:07:37

  • maybe

    2020-08-06 21:08:18

    1、java线程池内部维护一个数组和队列,当任务处理不过来的时候把任务放队列里慢慢处理
    2、ThreadPoolExecutor核心类,有这些参数corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、keepAliveTime(线程空闲时间)、unit(空闲时间单位)、workQueue(任务队列)、threadFactory(线程创建工厂)、handler(拒绝策略)
    3、ThreadPoolExecutor这些参数的作用是这样的:当提交任务的时候,如果线程数还未到达核心线程数,那么就直接创建新线程。继续提交任务,如果线程数到达核心线程数,那么就把任务放入队列中。如果队列满了还有任务提交过来,那么就需要创建临时线程,如果还有任务来,线程数到达了最大线程数,那么就会执行拒绝策略。threadFactory用来扩展原生的线程工厂,比如可以设置一个有意义的线程名称。keepAliveTime, unit这两个参数的意义是如果高峰期过了,线程都比较闲了,这些线程去任务队列使用poll方法拉活(poll方法设置了超时时间),如果拉不到活,那么就把这些线程销毁。
    4、FixedThreadPool、CachedThreadPool都是ThreadPoolExecutor的定制版,设置了不同的参数而已。
    FixedThreadPool设置了(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())这几个参数,固定线程数组长度,LinkedBlockingQueue默认是个无界队列,当任务处理不来,都丢到队列里等待。
    CachedThreadPool设置了(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()),最大线程数设置为 Integer.MAX_VALUE,相当于无限制,忙不过来的时候就不断创建临时线程。它的任务队列是 SynchronousQueue,表明队列长度为 0。
    5、tomcat的线程池也是ThreadPoolExecutor的定制版,不同的是如果线程数已大于核心线程数,当继续创建临时线程达到最大线程数的时候,并没有直接执行拒绝策略,而是重写了excute逻辑,当发生RejectedExecutionException时候进行了捕获,再尝试把任务往队列里放,如果还是满的,那么再执行拒绝策略。目的就是再次检查,如果任务被消耗了,那么就继续保留任务了,尽可能的保留任务。


  • chen

    2023-08-08 11:14:52

    我感觉老师在这一课上讲得不够完善,tomcat的线程池和原生线程还有一个不同点,就是当核心线程数满了,但是还未达到最大线程数量的时候,这时候直接触发非核心线程的创建,而不是加入队列。
  • FOCUS

    2022-05-10 15:53:48

    tomcat相当于综合了FixedThreadPool,CachedThreadPool各自的特点了, 优先使用线程处理,超过最大线程数就放在队列中等待处理
  • 进击的巨人

    2020-11-20 09:18:26

    既然要限制队列长度,为何不直接用arrayblockingquene啊
  • Gavin

    2020-09-13 17:24:27

    如果高峰过去了,线程池比较闲了怎么办?临时线程使用 poll(keepAliveTime, unit)方法从工作队列中拉活干,请注意 poll 方法设置了超时时间,如果超时了仍然两手空空没拉到活,表明它太闲了,这个线程会被销毁回收。

    这个只回收maximunPoolSize吧
  • jaryoung

    2019-08-28 19:36:52

    课后题,获取一次设置成一个局部变量,局部变量属于线程安全,无惧
  • brianway

    2019-08-07 00:35:34

    所以通过重写TaskQueue的offer方法,达到的效果就是将无界队列变成了有界队列,且队列长度限制=当前线程数,不知道我理解的对不对。举个例子,corePoolSize=4,maximumPoolSize=10,那来一个任务起一个线程,直到线程数为4,然后再来的任务就入队列,如果队列里任务积累到4个,随着任务继续增多,会新起线程处理,队列长度限制也会依次变成5,6,7,8,一直到10。
  • calljson

    2019-07-02 22:10:44

    空闲线程到队列取任务,能否讲解下原理,最好附上源码,谢谢