36 | 生产者-消费者模式:用流水线思想提高效率

前面我们在《34 | Worker Thread模式:如何避免重复创建线程?》中讲到,Worker Thread模式类比的是工厂里车间工人的工作模式。但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就是生产者-消费者模式

生产者-消费者模式在编程领域的应用也非常广泛,前面我们曾经提到,Java线程池本质上就是用生产者-消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者-消费者模式。

当然,除了在线程池中的应用,为了提升性能,并发编程领域很多地方也都用到了生产者-消费者模式,例如Log4j2中异步Appender内部也用到了生产者-消费者模式。所以今天我们就来深入地聊聊生产者-消费者模式,看看它具体有哪些优点,以及如何提升系统的性能。

生产者-消费者模式的优点

生产者-消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。下面是生产者-消费者模式的一个示意图,你可以结合它来理解。

生产者-消费者模式示意图

从架构设计的角度来看,生产者-消费者模式有一个很重要的优点,就是解耦。解耦对于大型系统的设计非常重要,而解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。在生产者-消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者-消费者模式是一个不错的解耦方案

除了架构设计上的优点之外,生产者-消费者模式还有一个重要的优点就是支持异步,并且能够平衡生产者和消费者的速度差异。在生产者-消费者模式中,生产者线程只需要将任务添加到任务队列而无需等待任务被消费者线程执行完,也就是说任务的生产和消费是异步的,这是与传统的方法之间调用的本质区别,传统的方法之间调用是同步的。

你或许会有这样的疑问,异步化处理最简单的方式就是创建一个新的线程去处理,那中间增加一个“任务队列”究竟有什么用呢?我觉得主要还是用于平衡生产者和消费者的速度差异。我们假设生产者的速率很慢,而消费者的速率很高,比如是1:3,如果生产者有3个线程,采用创建新的线程的方式,那么会创建3个子线程,而采用生产者-消费者模式,消费线程只需要1个就可以了。Java语言里,Java线程和操作系统线程是一一对应的,线程创建得太多,会增加上下文切换的成本,所以Java线程不是越多越好,适量即可。而生产者-消费者模式恰好能支持你用适量的线程

支持批量执行以提升性能

前面我们在《33 | Thread-Per-Message模式:最简单实用的分工方法》中讲过轻量级的线程,如果使用轻量级线程,就没有必要平衡生产者和消费者的速度差异了,因为轻量级线程本身就是廉价的,那是否意味着生产者-消费者模式在性能优化方面就无用武之地了呢?当然不是,有一类并发场景应用生产者-消费者模式就有奇效,那就是批量执行任务。

例如,我们要在数据库里INSERT 1000条数据,有两种方案:第一种方案是用1000个线程并发执行,每个线程INSERT一条数据;第二种方案是用1个线程,执行一个批量的SQL,一次性把1000条数据INSERT进去。这两种方案,显然是第二种方案效率更高,其实这样的应用场景就是我们上面提到的批量执行场景。

《35 | 两阶段终止模式:如何优雅地终止线程?》文章中,我们提到一个监控系统动态采集的案例,其实最终回传的监控数据还是要存入数据库的(如下图)。但被监控系统往往有很多,如果每一条回传数据都直接INSERT到数据库,那么这个方案就是上面提到的第一种方案:每个线程INSERT一条数据。很显然,更好的方案是批量执行SQL,那如何实现呢?这就要用到生产者-消费者模式了。

动态采集功能示意图

利用生产者-消费者模式实现批量执行SQL非常简单:将原来直接INSERT数据到数据库的线程作为生产者线程,生产者线程只需将数据添加到任务队列,然后消费者线程负责将任务从任务队列中批量取出并批量执行。

在下面的示例代码中,我们创建了5个消费者线程负责批量执行SQL,这5个消费者线程以 while(true){} 循环方式批量地获取任务并批量地执行。需要注意的是,从任务队列中获取批量任务的方法pollTasks()中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。

//任务队列
BlockingQueue<Task> bq=new
  LinkedBlockingQueue<>(2000);
//启动5个消费者线程
//执行批量任务  
void start() {
  ExecutorService es=executors
    .newFixedThreadPool(5);
  for (int i=0; i<5; i++) {
    es.execute(()->{
      try {
        while (true) {
          //获取批量任务
          List<Task> ts=pollTasks();
          //执行批量任务
          execTasks(ts);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}
//从任务队列中获取批量任务
List<Task> pollTasks() 
    throws InterruptedException{
  List<Task> ts=new LinkedList<>();
  //阻塞式获取一条任务
  Task t = bq.take();
  while (t != null) {
    ts.add(t);
    //非阻塞式获取一条任务
    t = bq.poll();
  }
  return ts;
}
//批量执行任务
execTasks(List<Task> ts) {
  //省略具体代码无数
}

支持分阶段提交以提升性能

利用生产者-消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。我曾经参与过一个项目,其中的日志组件是自己实现的,采用的就是异步刷盘方式,刷盘的时机是:

  1. ERROR级别的日志需要立即刷盘;
  2. 数据积累到500条需要立即刷盘;
  3. 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。

这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面我们具体看看用生产者-消费者模式如何实现。在下面的示例代码中,可以通过调用 info()error() 方法写入日志,这两个方法都是创建了一个日志任务LogMsg,并添加到阻塞队列中,调用 info()error() 方法的线程是生产者;而真正将日志写入文件的是消费者线程,在Logger这个类中,我们只创建了1个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。

class Logger {
  //任务队列  
  final BlockingQueue<LogMsg> bq
    = new BlockingQueue<>();
  //flush批量  
  static final int batchSize=500;
  //只需要一个线程写日志
  ExecutorService es = 
    Executors.newFixedThreadPool(1);
  //启动写日志线程
  void start(){
    File file=File.createTempFile(
      "foo", ".log");
    final FileWriter writer=
      new FileWriter(file);
    this.es.execute(()->{
      try {
        //未刷盘日志数量
        int curIdx = 0;
        long preFT=System.currentTimeMillis();
        while (true) {
          LogMsg log = bq.poll(
            5, TimeUnit.SECONDS);
          //写日志
          if (log != null) {
            writer.write(log.toString());
            ++curIdx;
          }
          //如果不存在未刷盘数据,则无需刷盘
          if (curIdx <= 0) {
            continue;
          }
          //根据规则刷盘
          if (log!=null && log.level==LEVEL.ERROR ||
              curIdx == batchSize ||
              System.currentTimeMillis()-preFT>5000){
            writer.flush();
            curIdx = 0;
            preFT=System.currentTimeMillis();
          }
        }
      }catch(Exception e){
        e.printStackTrace();
      } finally {
        try {
          writer.flush();
          writer.close();
        }catch(IOException e){
          e.printStackTrace();
        }
      }
    });  
  }
  //写INFO级别日志
  void info(String msg) {
    bq.put(new LogMsg(
      LEVEL.INFO, msg));
  }
  //写ERROR级别日志
  void error(String msg) {
    bq.put(new LogMsg(
      LEVEL.ERROR, msg));
  }
}
//日志级别
enum LEVEL {
  INFO, ERROR
}
class LogMsg {
  LEVEL level;
  String msg;
  //省略构造函数实现
  LogMsg(LEVEL lvl, String msg){}
  //省略toString()实现
  String toString(){}
}

总结

Java语言提供的线程池本身就是一种生产者-消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。

生产者-消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,你可以借助分布式消息队列(MQ)来实现生产者-消费者模式。MQ一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和Java的线程池非常类似(Java线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,你可以结合观察者模式实现广播功能。

课后思考

在日志组件异步刷盘的示例代码中,写日志的线程以 while(true){} 的方式执行,你有哪些办法可以优雅地终止这个线程呢?

this.writer.execute(()->{
  try {
    //未刷盘日志数量
    int curIdx = 0;
    long preFT=System.currentTimeMillis();
    while (true) {
    ......
    }
  } catch(Exception e) {}
}    

欢迎在留言区与我分享你的想法,也欢迎你在留言区记录你的思考过程。感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给更多的朋友。

精选留言

  • 2019-05-21 09:30:12

    在应用系统中,日志系统一般都是最后关闭的吧,因为它要为其他系统关闭提供写日志服务。所以日志系统关闭时需要把队列中所有日志都消费掉才能关闭。
    可能需要在关闭日志系统时投入一个毒丸,表示没有新的日志写入。线程池在消费到毒丸时知道没有日志写入,将所有的日志刷盘,break循环体。
    作者回复

    👍

    2019-05-21 12:10:39

  • PK時頭髮不亂

    2019-05-21 16:59:55

    极客时间有好多课程, 我觉得王老师的干货是最实际最可用的, 必须要赞一个。
    作者回复

    感谢感谢,有钱难买合适:)

    2019-05-21 23:04:55

  • êwěn

    2019-05-22 22:26:08


    之前遇到过一个生产问题,一个服务启动一段时间后就不停的超时,后面结合线程栈发现很多阻塞在打印日志的地方(我们用的就是log4j2),后面查到机子硬盘问题,io直接100%以上,日志刷盘满导致消费速度慢,队列撑满阻塞了写,这间接说明平衡好生产和消费速度以及适当的队列大小是很有必要。
    作者回复

    能快速定位的问题👍👍

    2019-05-23 21:16:11

  • 聂旋

    2019-05-23 21:01:50

    安卓的主线程中也是采用消息队列加消息循环方式,来处理用户输入及各种事件。当应用退出时,会发送一个处理对象为null的消息给队列,消息循环遇到这样的消息时就退出了。
  • 苏籍

    2019-05-22 11:00:43

    您好老师问个最近用到的线程池使用的问题
    我的工程是springboot的,在unitTest里(@SpringBootTest) 里调用了一个service A(通过@Autowired的)中的方法,A中启用了一个线程池,执行的任务 是往数据库里插入数据。但是总抛出数据源已经被关闭的异常,我理解的是在单测主线程已经结束,所以关闭了数据源这些清理工作,而此时线程池的线程还
    没结束,这个时候去调用数据源是null 的,不知道这么理解对不对,另外这个test主线程结束,为啥线程池的线程还没结束(通过打断点看到的)。这个怎么理解,求教
    作者回复

    只有守护线程才会自动结束,线程池的线程不是守护线程

    2019-05-22 22:34:58

  • Asanwos

    2019-08-08 09:02:05

    看到很多示例代码都没有关闭线程池的动作,难道局部的线程池就不要关闭吗?
    作者回复

    需要

    2019-08-10 17:31:13

  • Geek_c22199

    2020-04-06 22:37:49

    这段代码漂亮啊

    //阻塞式获取一条任务
    Task t = bq.take();
    while (t != null) {
    ts.add(t);
    //非阻塞式获取一条任务
    t = bq.poll();
    }
  • 生活发言权

    2019-09-18 22:00:00

    请问一下高并发场景,四个人拼一个团,怎么拼?
    1.db里记录拼团人数,如果小于4则直接update到拼团用户表,否则创建新的拼团id,新的记录。
    2.高并发场景,怎么保证读写db的一致性?redis和db双写?
    3.期待老师高见
    作者回复

    高并发下,我觉得主要是做好限流和缓存,保护好瓶颈资源数据库,限流和缓存的方案要看流量大小和系统架构

    2019-09-19 08:59:58

  • berthav_ss

    2019-06-11 05:57:26

    宝令老师,如何优雅的停止线程池中某一组线程呢?例如我在线程a中启动了1-10线程,线程b中启动了2-30线程,如何优雅停止1-10线程呢
    作者回复

    可以考虑一下毒丸的方式

    2019-06-12 22:06:04

  • 2020-06-03 11:30:17

    你好我想问下,生产者-消费者模式和worker thread模式有什么区别和联系?我看它们的核心都是任务队列,都是先把任务放到任务队列中,然后再从任务队列中获取任务并执行。
    作者回复

    几乎所有多线程程序都会有队列,所以从底层看都是一样的,但是从高层看,是对不同问题的抽象,设计过程是一个从高向低的过程,在高层次只关注模型而不关注实现。

    2020-11-04 07:51:08

  • null

    2019-07-26 14:37:51

    private BlockingQueue<X> bq = new LinkedBlockingQueue<>(1000);

    // 从任务队列中获取批量任务
    List<X> pollTasks() throws InterruptedException{
    List<X> ts=new LinkedList<>();

    X t = bq.take();
    while (t != null) {
    ts.add(t);
    t = bq.poll();
    }

    return ts;
    }

    -----

    需求背景:(一个线程往 bq 写数据,三个线程从 bq 读数据)
    1. 线程 A 从数据库批量读取数据,每次读 1000 条记录,然后在 for 循环内写入队列 bq.put(x)。
    2. 线程 B、线程 C、线程 D 调用 pollTasks() 方法获取数据列表,然后将数据列表做为参数,调用 Y 接口获取一批数据,最后进行业务运算。

    -----

    跑 demo 时发现 pollTasks() 方法有两个地方需要注意一下(一是:获取的列表数量不均,二是:退化成单元素列表):
    1. 线程 B、C、D 调用 pollTasks() 获得的列表,数据量不均匀,例如线程 B 只读取到 10+ 个元素,而线程 C 却读取了 1000+ 个元素。
    2. 如果我上游写入队列 bq 速度较慢(通过一些复杂的运算再写入 bq),这时下游通过 pollTasks() 获取的列表,几乎都是只有一个元素的列表。


    列表数据不均,可以增加返回列表的上限,或者增加超时机制。

    退化成单元素列表:
    1. pollTasks() 的调用方主动等待片刻,再获取数据。
    2. 修改 pollTasks() 的实现,返回列表的前提条件是:列表的 size 必须 batchSizeLimit 下限,否则等待超时 System.currentTimeMillis()-startMillis>1000。
  • 泛岁月的涟漪

    2019-05-21 09:34:13

    1、使用线程池的shutdown或者shutdownNow关闭线程池
    2、while循环条件设置为一个volatile boolean变量
    3、可以使用interrupt,但是线程是线程池管理的,没有消费者线程的引用中断不了
  • 曾轼麟

    2019-05-23 08:59:15

    使用Runtime提供的钩子,然后在关闭前,先让内部任务执行完毕,再释放资源
  • 晓杰

    2019-05-21 10:58:07

    35讲说到优雅地终止线程,首先需要线程状态转换为runnable状态(在终止刷盘的方法中调用Thread.interrupt()方法)
    然后可以通过设置标志位来让线程优雅终止,具体有两种方式:
    1、通过判断线程的中断状态Thread.currentThread.isInterrupted()
    2、设置自己的线程终止标志位,该标志位volatile修饰的共享变量。(这种方式需要在终止刷盘的方法中修改该共享变量的值)
  • ack

    2019-05-21 10:18:15

    public class Logger {
    ...

    volatile boolean stop;

    // 启动写日志线程
    void start() throws IOException {
    ...
    this.es.execute(() -> {
    try {
    // 未刷盘日志数量
    int curIdx = 0;
    long preFT = System.currentTimeMillis();
    while (!stop) {
    ...
    }
    } catch (InterruptedException e) {
    // 重新设置线程中断状态
    Thread.currentThread().interrupt();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    ...
    }
    });
    }
    ...
    void stop(){
    stop = true;
    es.shutdown();
    }
    }
  • 曾轼麟

    2019-05-23 09:07:40

    补充一下上面的留言,先通过创建的钩子去创建一个毒丸,然后释放资源
  • Geek_bbbda3

    2019-05-21 22:50:23

    volatile stoped;

    while(!stoped || bq.size() >0)
    {}

    public void shutdown(){
    Stoped =true;
    Es.shutdown();
    while(es.awaitUtilTime(5,timeutil.seconds){
    es.shutdownNow();

  • 张三

    2019-05-22 20:46:03

    还是不太懂,线程池的实现是有两种模式吗? Worker Thread 和 生产者-消费者 模式 ?
  • 兔斯基

    2019-05-21 07:50:01

    增加一个volatile标志位,刷盘结束后,判断标志位,这样不会影响数据落盘,但是可能会发起听之后5秒才结束。或者用线程中断方式,处理好中断异常以及中断标识即可
  • Geek_b38255

    2021-05-01 17:34:27

    老师你好,想请教下,生产者-消费者模式 与 Worker Thread 模式的区别是什么呢?
    作者回复

    很多模式底层实现都是生产者-消费者模式,模式更多地是与现实世界的类比,更多地是一种解决问题的思维方式。从类比的角度,Worker Thread 模式和生产者-消费者模式的区别就出来了,Worker Thread 模式更贴近现实,而生产者-消费者更多的是一种实现层面的模式。

    2021-05-04 16:19:14