04 | DAG与流水线:到底啥叫“内存计算”?

你好,我是吴磊。

在日常的开发工作中,我发现有两种现象很普遍。

第一种是缓存的滥用。无论是RDD,还是DataFrame,凡是能产生数据集的地方,开发同学一律用cache进行缓存,结果就是应用的执行性能奇差无比。开发同学也很委屈:“Spark不是内存计算的吗?为什么把数据缓存到内存里去,性能反而更差了?”

第二种现象是关于Shuffle的。我们都知道,Shuffle是Spark中的性能杀手,在开发应用时要尽可能地避免Shuffle操作。不过据我观察,很多初学者都没有足够的动力去重构代码来避免Shuffle,这些同学的想法往往是:“能把业务功能实现就不错了,费了半天劲去重写代码就算真的消除了Shuffle,能有多大的性能收益啊。”

以上这两种现象可能大多数人并不在意,但往往这些细节才决定了应用执行性能的优劣。在我看来,造成这两种现象的根本原因就在于,开发者对Spark内存计算的理解还不够透彻。所以今天,我们就来说说Spark的内存计算都有哪些含义?

第一层含义:分布式数据缓存

一提起Spark的“内存计算”的含义,你的第一反应很可能是:Spark允许开发者将分布式数据集缓存到计算节点的内存中,从而对其进行高效的数据访问。没错,这就是内存计算的第一层含义:众所周知的分布式数据缓存。

RDD cache确实是Spark分布式计算引擎的一大亮点,也是对业务应用进行性能调优的诸多利器之一,很多技术博客甚至是Spark官网,都在不厌其烦地强调RDD cache对于应用执行性能的重要性。

正因为考虑到这些因素,很多开发者才会在代码中不假思索地滥用cache机制,也就是我们刚刚提到的第一个现象。但是,这些同学都忽略了一个重要的细节:只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。

之所以会忽略这么重要的细节,背后深层次的原因在于,开发者对内存计算的理解仅仅停留在缓存这个层面。因此,当业务应用的执行性能出现问题时,只好死马当活马医,拼命地抓住cache这根救命稻草,结果反而越陷越深。

接下来,我们就重点说说内存计算的第二层含义:Stage内部的流水线式计算模式。

在Spark中,内存计算有两层含义:第一层含义就是众所周知的分布式数据缓存,第二层含义是Stage内的流水线式计算模式。关于RDD缓存的工作原理,我会在后续的课程中为你详细介绍,今天咱们重点关注内存计算的第二层含义就可以了。

第二层含义:Stage内的流水线式计算模式

很显然,要弄清楚内存计算的第二层含义,咱们得从DAG的Stages划分说起。在这之前,我们先来说说什么是DAG。

什么是DAG?

DAG全称Direct Acyclic Graph,中文叫有向无环图。顾名思义,DAG 是一种“图”。我们知道,任何一种图都包含两种基本元素:顶点(Vertex)和边(Edge),顶点通常用于表示实体,而边则代表实体间的关系。在Spark的DAG中,顶点是一个个RDD,边则是RDD之间通过dependencies属性构成的父子关系。

从理论切入去讲解DAG,未免枯燥乏味,所以我打算借助上一讲土豆工坊的例子,来帮助你直观地认识DAG。上一讲,土豆工坊成功地实现了同时生产 3 种不同尺寸的桶装“原味”薯片。但是,在将“原味”薯片推向市场一段时间以后,工坊老板发现季度销量直线下滑,不由得火往上撞、心急如焚。此时,工坊的工头儿向他建议:“老板,咱们何不把流水线稍加改造,推出不同风味的薯片,去迎合市场大众的多样化选择?”然后,工头儿把改装后的效果图交给老板,老板看后甚是满意。

不过,改造流水线可是个大工程,为了让改装工人能够高效协作,工头儿得把上面的改造设想抽象成一张施工流程图。有了这张蓝图,工头儿才能给负责改装的工人们分工,大伙儿才能拧成一股绳、劲儿往一处使。在上一讲中,我们把食材形态类比成RDD,把相邻食材形态的关系看作是RDD间的依赖,那么显然,流水线的施工流程图就是DAG。

因为DAG中的每一个顶点都由RDD构成,对应到上图中就是带泥的土豆potatosRDD,清洗过的土豆cleanedPotatosRDD,以及调料粉flavoursRDD等等。DAG的边则标记了不同RDD之间的依赖与转换关系。很明显,上图中DAG的每一条边都有指向性,而且整张图不存在环结构。

那DAG是怎么生成的呢?

我们都知道,在Spark的开发模型下,应用开发实际上就是灵活运用算子实现业务逻辑的过程。开发者在分布式数据集如RDD、 DataFrame或Dataset之上调用算子、封装计算逻辑,这个过程会衍生新的子RDD。与此同时,子RDD会把dependencies属性赋值到父RDD,把compute属性赋值到算子封装的计算逻辑。以此类推,在子RDD之上,开发者还会继续调用其他算子,衍生出新的RDD,如此往复便有了DAG。

因此,从开发者的视角出发,DAG的构建是通过在分布式数据集上不停地调用算子来完成的

Stages的划分

现在,我们知道了什么是DAG,以及DAG是如何构建的。不过,DAG毕竟只是一张流程图,Spark需要把这张流程图转化成分布式任务,才能充分利用分布式集群并行计算的优势。这就好比土豆工坊的施工流程图毕竟还只是蓝图,是工头儿给老板画的一张“饼”,工头儿得想方设法把它转化成实实在在的土豆加工流水线,让流水线能够源源不断地生产不同风味的薯片,才能解决老板的燃眉之急。

简单地说,从开发者构建DAG,到DAG转化的分布式任务在分布式环境中执行,其间会经历如下4个阶段:

  • 回溯DAG并划分Stages
  • 在Stages中创建分布式任务
  • 分布式任务的分发
  • 分布式任务的执行

刚才我们说了,内存计算的第二层含义在stages内部,因此这一讲我们只要搞清楚DAG是怎么划分Stages就够了。至于后面的3个阶段更偏向调度系统的范畴,所以我会在下一讲给你讲清楚其中的来龙去脉。

如果用一句话来概括从DAG到Stages的转化过程,那应该是:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages

接下来,我们还是以土豆工坊为例来详细说说这个过程。既然DAG是以Shuffle为边界去划分Stages,我们不妨先从上帝视角出发,看看在土豆工坊设计流程图的DAG中,都有哪些地方需要执行数据分发的操作。当然,在土豆工坊,数据就是各种形态的土豆和土豆片儿。

仔细观察上面的设计流程图,我们不难发现,有两个地方需要分发数据。第一个地方是薯片经过烘焙烤熟之后,把即食薯片按尺寸大小分发到下游的流水线上,这些流水线会专门处理固定型号的薯片,也就是图中从bakedChipsRDD到flavouredBakedChipsRDD的那条线。同理,不同的调料粉也需要按照风味的不同分发到下游的流水线上,用于和固定型号的即食薯片混合,也就是图中从flavoursRDD到flavouredBakedChipsRDD那条分支。

同时,我们也能发现,土豆工坊的DAG应该划分3个Stages出来,如图中所示。其中,Stage 0包含四个RDD,从带泥土豆potatosRDD到即食薯片bakedChipsRDD。Stage 1比较简单,它只有一个RDD,就是封装调味粉的flavoursRDD。Stage 2包含两个RDD,一个是加了不同风味的即食薯片flavouredBakedChipsRDD,另一个表示组装成桶已经准备售卖的桶装薯片bucketChipsRDD。

你可能会问:“费了半天劲,把DAG变成Stages有啥用呢?”还真有用!内存计算的第二层含义,就隐匿于从DAG划分出的一个又一个Stages之中。不过,要弄清楚Stage内的流水线式计算模式,我们还是得从Hadoop MapReduce的计算模型说起。

Stage中的内存计算

基于内存的计算模型并不是凭空产生的,而是根据前人的教训和后人的反思精心设计出来的。这个前人就是Hadoop MapReduce,后人自然就是Spark。

MapReduce提供两类计算抽象,分别是Map和Reduce:Map抽象允许开发者通过实现map 接口来定义数据处理逻辑;Reduce抽象则用于封装数据聚合逻辑。MapReduce计算模型最大的问题在于,所有操作之间的数据交换都以磁盘为媒介。例如,两个Map操作之间的计算,以及Map与Reduce操作之间的计算都是利用本地磁盘来交换数据的。不难想象,这种频繁的磁盘I/O必定会拖累用户应用端到端的执行性能。

那么,这和Stage内的流水线式计算模式有啥关系呢?我们再回到土豆工坊的例子中,把目光集中在即食薯片分发之前,也就是刚刚划分出来的Stage 0。这一阶段包含3个处理操作,即清洗、切片和烘焙。按常理来说,流水线式的作业方式非常高效,带泥土豆被清洗过后,会沿着流水线被传送到切片机,切完的生薯片会继续沿着流水线再传送到烘焙烤箱,整个过程一气呵成。如果把流水线看作是计算节点内存的话,那么清洗、切片和烘焙这3个操作都是在内存中完成计算的。

你可能会说:“内存计算也不过如此,跟MapReduce相比,不就是把数据和计算都挪到内存里去了吗?”事情可能并没有你想象的那么简单。

在土豆工坊的例子里,Stage 0中的每个加工环节都会生产出中间食材,如清洗过的土豆、土豆片、即食薯片。我们刚刚把流水线比作内存,这意味着每一个算子计算得到的中间结果都会在内存中缓存一份,以备下一个算子运算,这个过程与开发者在应用代码中滥用RDD cache简直如出一辙。如果你曾经也是逢RDD便cache,应该不难想象,采用这种计算模式,Spark的执行性能不见得比MapReduce强多少,尤其是在Stages中的算子数量较多的时候。

既然不是简单地把数据和计算挪到内存,那Stage内的流水线式计算模式到底长啥样呢?在Spark中,流水线计算模式指的是:在同一Stage内部,所有算子融合为一个函数,Stage的输出结果由这个函数一次性作用在输入数据集而产生。这也正是内存计算的第二层含义。下面,我们用一张图来直观地解释这一计算模式。

如图所示,在上面的计算流程中,如果你把流水线看作是内存,每一步操作过后都会生成临时数据,如图中的clean和slice,这些临时数据都会缓存在内存里。但在下面的内存计算中,所有操作步骤如clean、slice、bake,都会被捏合在一起构成一个函数。这个函数一次性地作用在“带泥土豆”上,直接生成“即食薯片”,在内存中不产生任何中间数据形态。

因此你看,所谓内存计算,不仅仅是指数据可以缓存在内存中,更重要的是让我们明白了,通过计算的融合来大幅提升数据在内存中的转换效率,进而从整体上提升应用的执行性能。

这个时候,我们就可以回答开头提出的第二个问题了:费劲去重写代码、消除Shuffle,能有多大的性能收益?

由于计算的融合只发生在Stages内部,而Shuffle是切割Stages的边界,因此一旦发生Shuffle,内存计算的代码融合就会中断。但是,当我们对内存计算有了多方位理解以后,就不会一股脑地只想到用cache去提升应用的执行性能,而是会更主动地想办法尽量避免Shuffle,让应用代码中尽可能多的部分融合为一个函数,从而提升计算效率。

小结

这一讲,我们以两个常见的现象为例,探讨了Spark内存计算的含义。

在Spark中,内存计算有两层含义:第一层含义就是众所周知的分布式数据缓存,第二层含义是Stage内的流水线式计算模式。

对于第二层含义,我们需要先搞清楚DAG和Stages划分,从开发者的视角出发,DAG的构建是通过在分布式数据集上不停地调用算子来完成的,DAG以Actions算子为起点,从后向前回溯,以Shuffle操作为边界,划分出不同的Stages。

最后,我们归纳出内存计算更完整的第二层含义:同一Stage内所有算子融合为一个函数,Stage的输出结果由这个函数一次性作用在输入数据集而产生。

每日一练

今天的内容重在理解,我希望你能结合下面两道思考题来巩固一下。

  1. 我们今天说了,DAG以Shuffle为边界划分Stages,那你知道Spark是根据什么来判断一个操作是否会引入Shuffle的呢?

  2. 在Spark中,同一Stage内的所有算子会融合为一个函数。你知道这一步是怎么做到的吗?

期待在留言区看到你的思考和答案,如果对内存计算还有很多困惑,也欢迎你写在留言区,我们下一讲见!

精选留言

  • 来世愿做友人 A

    2021-03-22 14:45:57

    问题1: rdd 会有 dep 属性,用来区分是否是 shuffle 生成的 rdd. 而 dep 属性的确定主要是根据子 rdd 是否依赖父 rdd 的某一部分数据,这个就得看他两的分区器(如果 tranf/action 有的话)。如果分区器一致,就不会产生 shuffle。
    问题2: 在 task 启动后,会调用 rdd iterator 进行算子链的递归生成,调用 stage 图中最后一个 rdd 的 compute 方法,一般如果是 spark 提供的 rdd,compute 函数大都会继续调用父 rdd 的 iterator 方法,直到到 stage 的根 rdd,一般都是 sourceRdd,比如 hadoopRdd,KakaRdd,就会返回 source iterator。开始返回,如果子rdd 是 map 转换的,就会组成 itr.map(f)。如果再下一个是 filter 转换,就会组成 itr.map(f1).filter(f2),以此类推。不知道这边理解对不对,有点绕
    作者回复

    Perfect!答得已经很完美了,不过咱们再进一步,第二题,假设不是rdd API,而是dataframe、dataset,spark对于同一个stage内的算子,会有哪些优化呢?

    2021-03-22 21:39:35

  • Shockang

    2021-03-23 23:20:26

    正如老师在文章里面提到的一样,Hadoop MapReduce使用硬盘来存储中间结果,而 Spark自从诞生以来就一直标榜自己是内存计算,可能有些同学会比较奇怪,为什么内存明显比硬盘快,MR 不去选择内存计算,实际上 MR 也有在使用内存的,比如环形缓冲区的存在就可以说明,之所以这样做,一个很重要的原因是 MR 诞生的年代(04 年)内存比较贵,后来随着科技发展,内存价格在不断下降,大家如果仔细研究就会发现比如 Spark 比如 Redis 等充分利用内存来计算的框架都是 10 年左右出现的,就是在这个时候内存价格开始大幅度下降的。我之所以说这么多,其实就想说明,事物的发展都是有规律的,大数据的背后也潜藏着各种规律,把握好这些规律,个人认为对于理解记忆各种不同的大数据技术都是很有帮助的。
    作者回复

    说得太好了!以史为鉴知兴替,就是这个道理。在纵向上把视角拉高,其实就更容易理解很多新概念、新办法、新框架、新引擎。说的真好,后续多讨论哈~

    2021-03-24 09:39:16

  • 西南偏北

    2021-05-03 12:01:14

    1. DAG以Shuffle划分Stages,Shuffle的产生主要通过宽依赖和窄依赖,而宽窄依赖主要通过不同的算子来产生,比如产生窄依赖的算子:map,flatMap,filter,mapPartitions,union;产生宽依赖的算子:cogroup,join,groupyByKey,reduceByKey,combineByKey,distinct,repartition
    2. 官网上看到过:WholeStageCodegen 全阶段代码生成将多个operators编译成一个Java函数来提升性能。
    作者回复

    两道题答得都不错~

    1. 宽窄依赖和算子类型,确实是判定Shuffle的主要依据,不过,还需要结合数据本身的分布来看。比如,你可以搜搜Collocated Join,这种Join情况是不会引入Shuffle的哈~
    2.WSCG这个提得很好,不过,这个是只有Spark SQL才能享受到的特性,也就是当你使用DataFrame、Dataset或是SQL进行开发的时候,才能享受到这个特性。对于纯粹的RDD API来说,所谓的“捏合”,其实是一种伪“捏合”,它是通过同一个Stage内部多个RDD算子compute函数嵌套的方式,来完成“捏合”。

    2021-05-04 16:22:10

  • Sansi

    2021-03-22 09:29:16

    内存计算的第二层含义真的算内存计算吗,mr不是也可以把spark的多个map操作放到一个map任务吗,我认为只是在api层面spark更简单
    作者回复

    好问题,我认为算的,多个操作在内存中完成统一的数据转换,我认为这就是内存计算。mr不同的map任务之间也是需要落盘的哟~ 更何况,同一stage内部,spark还有wscg这种优化,因此即便是同一个map stage之间的比拼,效率上spark也会比mr更好。

    2021-03-22 21:55:21

  • 对方正在输入。。。

    2021-03-22 10:02:16

    问题一:每个rdd会有个dependencies的属性,deps记录的是该rdd与父rdd之间的依赖关系,deps类型是Seq[dependency], 如果dependency类型是shuffleDenpendency,那么spark就会视其操作为shuffle操作,然后进行stage的切割。

    问题二:stage执行时,spark会调用该stage末尾rdd的iterator方法,然后iterator方法实现逻辑是:将该rdd的compute方法作用下父rdd的compute计算结果之上,从而得到该rdd的分区
    作者回复

    答得挺好~ 追问一句哈,第一题,spark怎么判断一个dependency是不是shuffle Dependency呢?

    2021-03-22 21:44:43

  • sparkjoy

    2021-07-13 11:14:55

    第一题,主要看父rdd的分区器是否一致,如果一致则生成子rdd的过程中不会产生shuffle
    作者回复

    是的~ 父子RDD的partitioner一致,就意味着他们会划分到同一个Stage~

    2021-07-14 16:13:09

  • Geek_18fe90

    2021-12-29 11:24:56

    spark shuffle前后的分区数是如何计算的
    作者回复

    Map阶段的并行度,会沿用父RDD的并行度,比如沿用HadoopRDD的并行度,这样的话,就是源文件原始的分片数量。Reduce阶段,可以通过repartition来调整,如果没有调整,默认按照spark.sql.shuffle.partitions来走~

    2022-01-01 10:26:17

  • 小学生敬亭山

    2021-03-26 17:13:32

    老师您好,我请教个问题。既然是大数据,那么假设数据很大,无论怎么分区或者分布式,单个机器的内存都放不下,那这个时候spark是怎么计算的呢?必然会有一部分在磁盘一部分在内存吧,这种情况spark是如何避免落盘,如何提升效率的呢。
    作者回复

    好问题,先来回答你的问题:

    1. 不是数据大于内存就会溢出到磁盘,取决于分片大小和每个task的可用内存。这部分在cpu、内存视角那几讲会详细展开,怎么平衡并行度、线程池、内存消耗。到时候可以关注一下哈~

    2. shuffle的过程确实有落盘的步骤,但也仅限shuffle操作。stage内部是流水线式的内存计算,不会有落盘的动作。

    2021-03-26 19:09:30

  • 斯盖丸

    2021-04-18 13:20:47

    请问下老师,spark里cache的正确姿势是什么?
    是直接df.cache()还是val cacheDf = df.cache()呢?另外不管cache还是persist都是lazy的,所以有必要紧接着一句df.count()让它马上执行吗?因为这样会平白无故多一个job,不知道是不是画蛇添足了
    作者回复

    df.cache()
    df.count

    或是
    val cacheDf = df.cache()
    cacheDf.count

    都可以,action是必需的,没有action,不会触发缓存的计算和存储,这可不是画蛇添足哈~

    2021-04-18 23:50:56

  • Wiggle Wiggle

    2021-04-13 00:31:37

    说个最极端的情况,如果对一个dataframe Read以后做了一堆不会触发shuffle 的操作,最后又调用了一下coalesce(1),然后write ,那是不是就意味着从读数据开始的所有操作都会在一个executor上完成?
    作者回复

    非常好的问题,这个edge case非常有意思,我们来细说说~ 取决于你如何调用coalesce(1, shuffle = false/true),分两种情况。

    1. shuffle = false,就像你说的,所有操作,从一开始,并行度都是1,都在一个executor计算,显然,这个时候,整个作业非常慢,奇慢无比

    2. shuffle = true,这个时候,coalesce就会引入shuffle,切割stage。coalesce之前,用源数据DataFrame的并行度,这个时候是多个Executors真正的并行计算;coalesce之后,也就是shuffle之后,并行度下降为1,所有父RDD的分区,全部shuffle到一个executor,交给一个task去计算。显然,相比前一种,这种实现在执行效率上,更好一些。因此,如果业务应用必须要这么做,推荐这一种实现方法。

    2021-04-13 17:42:05

  • Fendora范东_

    2021-04-02 21:25:00

    1.DAG以shuffle划分stage; 判断shuffle的依据是 rdd的deps属性是narrowDeps还是shuffleDeps; deps类型怎么得来的,肯定是构造rdd时生成的;构造rdd时依据什么来生成不同类型的deps呢,这块还没深究,猜测是根据算子类型,比如window func或者aggregator。
    2.所有算子融合到一起是通过全阶段代码生成。如果不能进行全阶段代码生成就进行基本表达式代码生成,但基本表达式代码生成每个算子处理逻辑还是分开的,所以磊哥能解释下仅仅进行基本表达式代码生成好处在哪嘛
    作者回复

    两道题都回答得挺完美~ expr代码生成是tungsten比较早期的,我理解是一种铺垫和过度,目的是辅助最终的wscg。单单是expr code gen,提升不了多少性能。

    2021-04-03 14:18:02

  • 小学生敬亭山

    2021-03-26 17:10:38

    问题1:逻辑层面上,如果聚焦在当前节点,看前1个节点和当前节点的关系。存在1对1,1对多,多对1,多对多四种可能。所谓shuffle就是不能链式调用了,需要用到上一步的多个节点。可以理解成上一步的数据要交出来混在一起又重新发出去。因此发生了网络传输或者落盘。多对1和多对多可能会shuffle.代码实现层面就是有 依赖关系可以在stage回溯的时候可以用。
    问题2.能融合得益于函数式编程的思想,可以链式调用,然后通过生成类似于树结构的语法分析,然后生成逻辑执行计划,物理执行计划。spark有所谓"钨丝计划"。然后更深入的优化内容,那我就说不清楚了。
    作者回复

    第一题答得不错,追问:spark怎么判断1对多、多对多呢?

    第二题很多点都说的很好,比如链式调用、catalyst优化、tungsten,不过逻辑有些混乱,可以关注后面的catalyst优化器和tungsten这两讲,把这里的思路理清楚哈~

    2021-03-26 19:21:43

  • Elon

    2021-03-23 13:12:30

    不得不说,这个土豆的例子可是太棒了~
    作者回复

    哈哈,喜欢就好哈~

    2021-03-24 09:35:13

  • 杜兰特有丶小帅

    2023-08-20 17:46:59

    老师,TEZ好像也是将MR任务分解成算子,然后在内存里计算的。我想问一下,spark和TEZ在优化MR上面有什么不同?
  • Geek_853ebe

    2022-10-21 17:01:47

    路人转粉,老师怎么加群呀
  • Geek_853ebe

    2022-10-21 17:00:43

    老师,请教下DataSet Api中的union一定会让数据变成一个分区吗?
    在线上有个卡死的任务,十几个select union在一起,看卡住的stage中,数据倾斜到了一个task中,其他task空跑,看时间线这个job中其他的action都是并发跑的很快。
  • 松花酿酒,春水煎茶

    2022-07-09 16:25:44

    问题一:分算子,根据宽窄依赖或者父子RDD partitioner来判断;
  • Gti

    2021-03-29 14:16:38

    map的结果不是都写到本地磁盘吗?reducer从hdfs去mapper的结果?
    作者回复

    map是写到本地盘啊,没说写到hdfs啊

    2021-03-29 18:39:41

  • Geek2014

    2021-03-27 23:40:16

    MR可以开发者自己手动在一个map方法里整合多个算子的功能啊,只是spark做了简化。

    问题1主要就是宽窄依赖的问题
    作者回复

    确实,mr的map阶段也能用多个算子,一来开发成本高,二来spark在同一个stage内部,还有whole stage code gen哈

    追问一下:spark怎么判断宽窄依赖呢?

    2021-03-29 08:48:39

  • Z宇锤锤

    2021-03-27 14:03:21

    是否需要进行shuffle。当RDD和父RDD的依赖关系是宽依赖是,就会进行数据的shuffle.
    作者回复

    没问题,追问一下:spark怎么判断宽窄依赖呢?

    2021-03-27 18:03:44