13 | 同样的本质,为何Spark可以更高效?

上一期我们讨论了Spark的编程模型,这期我们聊聊Spark的架构原理。和MapReduce一样,Spark也遵循移动计算比移动数据更划算这一大数据计算基本原则。但是和MapReduce僵化的Map与Reduce分阶段计算相比,Spark的计算框架更加富有弹性和灵活性,进而有更好的运行性能。

Spark的计算阶段

我们可以对比来看。首先和MapReduce一个应用一次只运行一个map和一个reduce不同,Spark可以根据应用的复杂程度,分割成更多的计算阶段(stage),这些计算阶段组成一个有向无环图DAG,Spark任务调度器可以根据DAG的依赖关系执行计算阶段。

还记得在上一期,我举了一个比较逻辑回归机器学习性能的例子,发现Spark比MapReduce快100多倍。因为某些机器学习算法可能需要进行大量的迭代计算,产生数万个计算阶段,这些计算阶段在一个应用中处理完成,而不是像MapReduce那样需要启动数万个应用,因此极大地提高了运行效率。

所谓DAG也就是有向无环图,就是说不同阶段的依赖关系是有向的,计算过程只能沿着依赖关系方向执行,被依赖的阶段执行完成之前,依赖的阶段不能开始执行,同时,这个依赖关系不能有环形依赖,否则就成为死循环了。下面这张图描述了一个典型的Spark运行DAG的不同阶段。

从图上看,整个应用被切分成3个阶段,阶段3需要依赖阶段1和阶段2,阶段1和阶段2互不依赖。Spark在执行调度的时候,先执行阶段1和阶段2,完成以后,再执行阶段3。如果有更多的阶段,Spark的策略也是一样的。只要根据程序初始化好DAG,就建立了依赖关系,然后根据依赖关系顺序执行各个计算阶段,Spark大数据应用的计算就完成了。

上图这个DAG对应的Spark程序伪代码如下。

rddB = rddA.groupBy(key)
rddD = rddC.map(func)
rddF = rddD.union(rddE)
rddG = rddB.join(rddF)

所以,你可以看到Spark作业调度执行的核心是DAG,有了DAG,整个应用就被切分成哪些阶段,每个阶段的依赖关系也就清楚了。之后再根据每个阶段要处理的数据量生成相应的任务集合(TaskSet),每个任务都分配一个任务进程去处理,Spark就实现了大数据的分布式计算。

具体来看的话,负责Spark应用DAG生成和管理的组件是DAGScheduler,DAGScheduler根据程序代码生成DAG,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。

那么Spark划分计算阶段的依据是什么呢?显然并不是RDD上的每个转换函数都会生成一个计算阶段,比如上面的例子有4个转换函数,但是只有3个阶段。

你可以再观察一下上面的DAG图,关于计算阶段的划分从图上就能看出规律,当RDD之间的转换连接线呈现多对多交叉连接的时候,就会产生新的阶段。一个RDD代表一个数据集,图中每个RDD里面都包含多个小块,每个小块代表RDD的一个分片。

一个数据集中的多个数据分片需要进行分区传输,写入到另一个数据集的不同分片中,这种数据分区交叉传输的操作,我们在MapReduce的运行过程中也看到过。

是的,这就是shuffle过程,Spark也需要通过shuffle将数据进行重新组合,相同Key的数据放在一起,进行聚合、关联等操作,因而每次shuffle都产生新的计算阶段。这也是为什么计算阶段会有依赖关系,它需要的数据来源于前面一个或多个计算阶段产生的数据,必须等待前面的阶段执行完毕才能进行shuffle,并得到数据。

这里需要你特别注意的是,计算阶段划分的依据是shuffle,不是转换函数的类型,有的函数有时候有shuffle,有时候没有。比如上图例子中RDD B和RDD F进行join,得到RDD G,这里的RDD F需要进行shuffle,RDD B就不需要。

因为RDD B在前面一个阶段,阶段1的shuffle过程中,已经进行了数据分区。分区数目和分区Key不变,就不需要再进行shuffle。

这种不需要进行shuffle的依赖,在Spark里被称作窄依赖;相反的,需要进行shuffle的依赖,被称作宽依赖。跟MapReduce一样,shuffle也是Spark最重要的一个环节,只有通过shuffle,相关数据才能互相计算,构建起复杂的应用逻辑。

在你熟悉Spark里的shuffle机制后我们回到今天文章的标题,同样都要经过shuffle,为什么Spark可以更高效呢?

其实从本质上看,Spark可以算作是一种MapReduce计算模型的不同实现。Hadoop MapReduce简单粗暴地根据shuffle将大数据计算分成Map和Reduce两个阶段,然后就算完事了。而Spark更细腻一点,将前一个的Reduce和后一个的Map连接起来,当作一个阶段持续计算,形成一个更加优雅、高效的计算模型,虽然其本质依然是Map和Reduce。但是这种多个计算阶段依赖执行的方案可以有效减少对HDFS的访问,减少作业的调度执行次数,因此执行速度也更快。

并且和Hadoop MapReduce主要使用磁盘存储shuffle过程中的数据不同,Spark优先使用内存进行数据存储,包括RDD数据。除非是内存不够用了,否则是尽可能使用内存, 这也是Spark性能比Hadoop高的另一个原因。

Spark的作业管理

我在专栏上一期提到,Spark里面的RDD函数有两种,一种是转换函数,调用以后得到的还是一个RDD,RDD的计算逻辑主要通过转换函数完成。

另一种是action函数,调用以后不再返回RDD。比如count()函数,返回RDD中数据的元素个数;saveAsTextFile(path),将RDD数据存储到path路径下。Spark的DAGScheduler在遇到shuffle的时候,会生成一个计算阶段,在遇到action函数的时候,会生成一个作业(job)。

RDD里面的每个数据分片,Spark都会创建一个计算任务去处理,所以一个计算阶段会包含很多个计算任务(task)。

关于作业、计算阶段、任务的依赖和时间先后关系你可以通过下图看到。

图中横轴方向是时间,纵轴方向是任务。两条粗黑线之间是一个作业,两条细线之间是一个计算阶段。一个作业至少包含一个计算阶段。水平方向红色的线是任务,每个阶段由很多个任务组成,这些任务组成一个任务集合。

DAGScheduler根据代码生成DAG图以后,Spark的任务调度就以任务为单位进行分配,将任务分配到分布式集群的不同机器上执行。

Spark的执行过程

Spark支持Standalone、Yarn、Mesos、Kubernetes等多种部署方案,几种部署方案原理也都一样,只是不同组件角色命名不同,但是核心功能和运行流程都差不多。

上面这张图是Spark的运行流程,我们一步一步来看。

首先,Spark应用程序启动在自己的JVM进程里,即Driver进程,启动后调用SparkContext初始化执行配置和输入数据。SparkContext启动DAGScheduler构造执行的DAG图,切分成最小的执行单位也就是计算任务。

然后Driver向Cluster Manager请求计算资源,用于DAG的分布式计算。Cluster Manager收到请求以后,将Driver的主机地址等信息通知给集群的所有计算节点Worker。

Worker收到信息以后,根据Driver的主机地址,跟Driver通信并注册,然后根据自己的空闲资源向Driver通报自己可以领用的任务数。Driver根据DAG图开始向注册的Worker分配任务。

Worker收到任务后,启动Executor进程开始执行任务。Executor先检查自己是否有Driver的执行代码,如果没有,从Driver下载执行代码,通过Java反射加载后开始执行。

小结

总结来说,Spark有三个主要特性:RDD的编程模型更简单,DAG切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得Spark相对Hadoop MapReduce可以有更快的执行速度,以及更简单的编程实现。

Spark的出现和流行其实也有某种必然性,是天时、地利、人和的共同作用。首先,Spark在2012年左右开始流行,那时内存的容量提升和成本降低已经比MapReduce出现的十年前强了一个数量级,Spark优先使用内存的条件已经成熟;其次,使用大数据进行机器学习的需求越来越强烈,不再是早先年那种数据分析的简单计算需求。而机器学习的算法大多需要很多轮迭代,Spark的stage划分相比Map和Reduce的简单划分,有更加友好的编程体验和更高效的执行效率。于是Spark成为大数据计算新的王者也就不足为奇了。

思考题

Spark的流行离不开它成功的开源运作,开源并不是把源代码丢到GitHub上公开就万事大吉了,一个成功的开源项目需要吸引大量高质量开发者参与其中,还需要很多用户使用才能形成影响力。

Spark开发团队为Spark开源运作进行了大量的商业和非商业活动,你了解这些活动有哪些吗?假如你所在的公司想要开源自己的软件,用于提升自己公司的技术竞争力和影响力,如果是你负责人,你应该如何运作?

欢迎你写下自己的思考或疑问,与我和其他同学一起讨论。

精选留言

  • vivi

    2018-12-25 22:29:08

    懂了原理,实战其实很简单,不用急着学部署啊,操作,原理懂了,才能用好,我觉得讲得很好
    作者回复

    👍🏻

    2018-12-26 09:13:00

  • 张小喵

    2019-12-09 11:36:10

    把12|13反复读了4、5遍,以下是我的感悟
    Spark再次理解:
    1:Spark计算框架编程和运行速度比MapReduce更加的简单和快
    编程更简单:Spark编程关注的数据是RDD,RDD是个抽象的,比较不好理解,我的理解RDD是一次计算阶段中 要操作的所有的数据的抽象,虽然它们是分片的,并且分布在HDFS的任意节点上,但是概念上,我们是针对这个抽象的RDD编程的。使用Scala编程,wordcount只需要三行代码,很简单是吧,但是背后的整体的计算过程是相当的复杂的。这样看我们在MapReduce中的编程要是关注所有的数据的,默认为map的数据输入的数据是整个要处理的数据,并不是说,其他的应用就不知道了,毕竟我们也可以在代码中感知到在文件中的偏移量这种东西。
    速度更快:MapReduce暴力的把计算阶段分为两个阶段,Map和Reduce阶段, 如果一个应用的计算实现只有两个阶段,那么MapReduce计算框架的速度不会比Spark慢多少,慢的地方只是在于Spark是不经过落盘的操作的,直接在内存中存储,但是如果一个应用的计算阶段变得很多的话,比如机器学习中的迭代计算,那么使用MapReduce的就非常慢了,如果这个应用的计算分为10000个计算阶段,如果用MapReduce实现,就需要启动5000次相关的应用,速度很慢,并且编码会很麻烦,如果 是Spark,那么在一次应用就可以解决该问题。

    Spark的多个计算阶段的理解:
    相比于MapReduce只有两个计算阶段, Spark理论上可以有无限个计算阶段, 这也是Spark的速度的优势
    Spark的计算阶段的表示中,DAG(有向无环图)是Spark的关键,DAG可以很好的表示每个计算阶段的关系,或者说依赖书序
    那么DAG是谁生成的呢,是根据什么生成的呢?
    DAG是有Spark计算框架根据用于所写的代码生成的,那怎么依据代码的什么生成的呢?
    类比MapReduce的两个计算阶段,两个阶段之间的过度是什么?是shuffle,Spark也是根据Spark中的代码中的转换函数是否是有shuffle操作进行划分阶段的!
    我的理解,Spark的每个计算阶段可以类比MapReduce中的Map阶段或者Reduce阶段。不同的是,Spark计算阶段关注的是RDD,但是又有相同的点,RDD中的数据组成也是一片一片的,Spark中的最小的任务也就是对于片的计算,原理和MapReduce一样,Spark中的片和MapReduce中的片是通一个东西,每个片都是分布于HDFS上的,对于每个片的计算大概率也是在片所在的计算节点的。所以Spark的计算也是分布式,并且原理和MapReduce是一样的。

    Spark中RDD上的操作函数:RDD上的操作函数分为两种类型,一种是转换函数,另一种是action函数
    转换函数:Spark编程中对于RDD的操作基本是用转换函数来完成的,转换函数是计算RDD,转换函数又分为两种
    只是改变RDD内容的转换函数:类似map,filter函数,RDD本身物理上没有变化,所有的操作都是针对于当前的分片
    会新生成RDD转换函数:类似于reduceByKey这种函数,会组合key生成新的RDD(所有的会生成新的RDD的函数都可以作为 计算阶段的分割函数吗?)
    action函数:aciton函数对于RDD的操作没有返回值,或者说不会改变RDD的内容,比如rdd.saveToPath等等

    Spark中一些关于应用的生命周期中的过程的名词、概念:
    Spark中的RDD函数主要有两种,一是转换函数,调用转换函数可以返回一个RDD(产生新的RDD?/会shuffle的函数分割计算阶段),RDD的计算逻辑主要是由转换函数完成

    另一种是action函数,调用这种函数不返回RDD,DAGScheduler在遇到shuffle的时候生成一个新的计算阶段,在遇到action函数的时候,产生一个作业。(我理解可以类似于,一次MapReduce程序对应Spark中的一个作业)
    在每个计算阶段都是针对RDD(包含很多片)的计算,每个分片Spark都会创建一个计算任务去处理,所以每个计算阶段会包含很多个计算任务
  • 落叶飞逝的恋

    2018-11-27 18:44:48

    总结:Spark的优点就是能够动态根据计算逻辑的复杂度进行不断的拆分子任务,而实现在一个应用中处理所有的逻辑,而不像MapReduce需要启动多个应用进行计算。
  • scorpiozj

    2018-11-28 08:17:32

    移动计算比移动数据划算 总结的真好 很多设计仔细想一想都是围绕这个中心

    关于开源
    1 准备好详细的使用 api文档并提供示例
    2 撰写设计思路 和竞品比较的优势 以及创新点
    3 提前联系若干团队使用产品 并请他们提供真实的提高效率的数据报告
    4 联系公关团队在知名技术论坛推广
    5 成立团队 及时响应开发者疑问 需求和pr等
  • My dream

    2018-11-28 21:25:43

    老师,你讲的理论看的我头晕脑胀的,能不能讲点实战操作,搭建spark环境,通过案例来讲的话,对于我们这些初学学生来说是最直观,最容易弄明白它为什么会那么快,像你这样一味的讲理论,不讲实战,我们实在是吸收不了,理解不了你讲的这些知识点
  • 纯洁的憎恶

    2018-11-27 19:06:51

    这两天的内容对我来说有些复杂,很多知识点没有理解透。针对“而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效地计算模型”。这句话中“将前一个的 Reduce 和后一个的 Map 连接起来”在细节上该如何理解,这也是明显的串行过程,感觉不会比传统的MapReduce快?是因为不同阶段之间有可能并行么?
    作者回复

    引用楼下的评论回复

    落叶飞逝的恋
    总结:Spark的优点就是能够动态根据计算逻辑的复杂度进行不断的拆分子任务,而实现在一个应用中处理所有的逻辑,而不像MapReduce需要启动多个应用进行计算。

    2018-11-27 22:41:10

  • ming

    2018-12-08 11:48:28

    老师,有一句话我不太理解,请老师指导。“DAGScheduler 根据代码和数据分布生成 DAG 图”。根据代码生产DAG图我理解,但是为什么生成DAG图还要根据数据分布生成,数据分布不同,生成的DAG图也会不同吗?
    作者回复

    数据分布删掉,谢谢指正。

    2018-12-11 08:41:08

  • 张飞

    2019-03-01 09:14:04

    1.“而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效地计算模型”,stage之间是串行的,即便前一个的reduce和后一个的map连接起来,也是要从前一个stage的计算节点的磁盘上拉取数据的,这跟mapreduce的计算是一样的,老师所说的高效在这里是怎么提现的呢?
    2. spark的内存计算主要体现在shuffle过程,下一个stage拉取上一个stage的数据的时候更多的使用内存,在这里区分出与mapreduce计算的不同,别的还有什么阶段比mapreduce更依赖内存的吗?
    3.我是不是可以这样理解,如果只有map阶段的话,即便计算量很大,mapreduce与spark的计算速度上也没有太大的区别?
    可能问题问的不够清晰,希望老师解答一下。
    作者回复

    1 Spark的map和reduce的划分要更优雅一点,比如宽依赖和窄依赖,编程上看不出明显的map和reduce,这种优雅还有很多,多写一些spark和MapReduce程序就能感受到。

    2 如果内存够用,Spark几乎总是使用内存。

    3 可以这么理解。

    2019-03-04 09:27:39

  • arnm

    2018-11-28 07:56:47

    每一篇文章都认真的读了,有些东西还没真正的去在实际工作中体会到,但这种思维的启发还是受益匪浅。
  • weiruan85

    2019-07-31 11:05:16

    1.完备的技术说明文档是必须的,比如使用场景,常见问题,环境搭建,核心技术的原理等。
    2.输出真实等使用案例,以及给解决实际问题带来等好处,比如如果没有我们的开源方案是怎么实现的,有了这个方案是怎么实现的,差异是什么
    3.商业推广,找业界有名的公司站台,或者有名的技术大牛做宣传(头羊效应)
    4.归根结底,还是得有开创性的技术,能解决现实中的某一类问题。
    作者回复

    2019-08-01 15:47:55

  • 2019-03-07 08:03:16

    对于hbase和高速发展的es,不知道您怎么看,他们的优缺点是什么?
  • 白鸽

    2018-11-30 09:24:29

    Executor 从 Diver 下载执行代码,是整个程序 jar包?还是仅 Executor 计算任务对应的一段计算程序(经SparkSession初始化后的)?
    作者回复

    整个jar

    2018-12-01 22:59:33

  • 追梦小乐

    2018-11-27 19:27:14

    老师,我想请教几个问题:
    1、“根据每个阶段要处理的数据量生成相应的任务集合(TaskSet),每个任务都分配一个任务进程去处理”,是一个任务集合TaskSet启动一个进程,taskSet里面的任务是用线程计算吗?还是每个TaskSet里面的任务启动一个进程?

    2、task-time 图中红色密集的表示什么?

    3、Spark 的执行过程 的图中 Executor 中的 Cache 是表示内存吗?task和Executor不是在内存中的吗?
    作者回复

    每个任务一个进程

    很多红色线条,每条线代表一个任务

    cache理解成存储rdd的内存

    2018-11-27 22:43:07

  • 落叶飞逝的恋

    2019-03-04 18:14:56

    现在回过头看,Spark的编程模型其实类似Java8的Steam编程模型
  • yang

    2018-11-27 08:41:57

    啊、老师现在的提问都好大,我现在是老虎吃天无从下爪啊 ^_^
    作者回复

    有些问题不一定要得到答案或者回答出来,只是关注到了思考一下,就会有收获~

    2018-11-27 15:44:05

  • weiruan85

    2019-07-31 10:33:49

    客户:我想给数据库中的一张关键表添加几个索引,对生产有没有影响
    fey:为什么要加索引呢,是张什么表
    客户:系统中的流水表,查询比较慢
    fey:数据量大概多大
    客户:有5000万
    fey:存储了多久的数据呢
    客户:存了将近2年的数据了
    fey:按照业务应该保存多久的数据呢
    客户:3个月
    fey:那我们是不是应该先把历史数据进行归档,然后在添加索引呢。
    客户:对,可以先做数据的归档。
  • Yezhiwei

    2019-01-14 10:12:04

    这里是学习过程中做的一些总结

    https://mp.weixin.qq.com/s/OyPRXAu9hR1KWIbvc20y1g
    作者回复

    👍👍👍

    2019-01-15 11:39:57

  • 蓬蒿

    2018-12-14 16:36:59

    “DAGScheduler 根据程序代码生成 DAG,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。Worker 收到任务后,启动 Executor 进程开始执行任务。Executor 先检查自己是否有 Driver 的执行代码,如果没有,从 Driver 下载执行代码,通过 Java 反射加载后开始执行。” 针对这一段话,我想多请教老师一些,我理解的DAGScheduler 根据程序代码生成 DAG,类似于关系型数据库优化器根据SQL生成执行计划,然后spark计算引擎根据这些计划去做计算,我的疑惑的是:DAG已经是根据代码生成的了,那Worker 还要从 Driver 下载执行代码去执行,我无法想象worker是如何执行代码的,能否帮忙解疑一下?
    作者回复

    SQL的执行计划和spark的dag,都是执行描述,可以用文本查看的,不包括执行代码。

    worker下载应用程序的jar包,反射加载执行。第三模块spark源码优化有详细描述。

    2018-12-15 11:12:50

  • 星凡

    2019-10-06 12:13:56

    老师您好,请问一下,对于复杂的多阶段计算,Hadoop MapReduce是需要进行多次map reduce 过程吗,而且每次map和reduce之间一定会进行shuffle(主要使用磁盘进行存储),所以相比Spark才更加低效,不知道我理解的正确嘛,请指教
    作者回复

    是的,比如复杂的SQL,就会生成多个job,也就是多个map reduce过程。

    2019-10-09 14:31:55

  • 冰ྂ镇ྂ可ྂ乐ྂ

    2019-04-30 11:02:40

    之前讲mr时候也有提到生成dag,spark这里也是dag,二者的差异是mr中,map reduce为一组操作(可能没有reduce)的一个job job之间是依赖关系,而spark并非简单依照m r划分而是针对数据的处理情况,如果r后到下一个m是窄依赖,则属于同一个stage,属于一个流程,这样理解对吗?
    作者回复

    可以这样理解。

    MR模型本身不包含DAG,需要外部工具基于MR构建DAG实现复杂的计算,比如Hive。
    Spark的计算模型本身就是包含DAG。

    2019-05-05 09:40:05