01 | 性能调优的必要性:Spark本身就很快,为啥还需要我调优?

你好,我是吴磊。

在日常的开发工作中,我发现有个现象很普遍。很多开发者都认为Spark的执行性能已经非常强了,实际工作中只要按部就班地实现业务功能就可以了,没有必要进行性能调优。

你是不是也这么认为呢?确实,Spark的核心竞争力就是它的执行性能,这主要得益于Spark基于内存计算的运行模式和钨丝计划的锦上添花,以及Spark SQL上的专注与发力。

但是,真如大家所说,开发者只要把业务逻辑实现了就万事大吉了吗?这样,咱们先不急于得出结论,你先跟着我一起看两个日常开发中常见的例子,最后我们再来回答这个问题。

在数据应用场景中,ETL(Extract Transform Load)往往是打头阵的那个,毕竟源数据经过抽取和转换才能用于探索和分析,或者是供养给机器学习算法进行模型训练,从而挖掘出数据深层次的价值。我们今天要举的两个例子,都取自典型ETL端到端作业中常见的操作和计算任务。

开发案例1:数据抽取

第一个例子很简单:给定数据条目,从中抽取特定字段。这样的数据处理需求在平时的ETL作业中相当普遍。想要实现这个需求,我们需要定义一个函数extractFields:它的输入参数是Seq[Row]类型,也即数据条目序列;输出结果的返回类型是Seq[(String, Int)],也就是(String, Int)对儿的序列;函数的计算逻辑是从数据条目中抽取索引为2的字符串和索引为4的整型。

应该说这个业务需求相当简单明了,实现起来简直是小菜一碟。在实际开发中,我观察到有不少同学一上来就迅速地用下面的方式去实现,干脆利落,代码写得挺快,功能也没问题,UT、功能测试都能过。

//实现方案1 —— 反例
val extractFields: Seq[Row] => Seq[(String, Int)] = {
  (rows: Seq[Row]) => {
    var fields = Seq[(String, Int)]()
    rows.map(row => {
        fields = fields :+ (row.getString(2), row.getInt(4))
    })
  fields
  }
}

在上面这个函数体中,是先定义一个类型是Seq[(String, Int)]的变量fields,变量类型和函数返回类型完全一致。然后,函数逐个遍历输入参数中的数据条目,抽取数据条目中索引是2和4的字段并且构建二元元组,紧接着把元组追加到最初定义的变量fields中。最后,函数返回类型是Seq[(String, Int)]的变量fields。

乍看上去,这个函数似乎没什么问题。特殊的地方在于,尽管这个数据抽取函数很小,在复杂的ETL应用里是非常微小的一环,但在整个ETL作业中,它会在不同地方被频繁地反复调用。如果我基于这份代码把整个ETL应用推上线,就会发现ETL作业端到端的执行效率非常差,在分布式环境下完成作业需要两个小时,这样的速度难免有点让人沮丧。

想要让ETL作业跑得更快,我们自然需要做性能调优。可问题是我们该从哪儿入手呢?既然extractFields这个小函数会被频繁地调用,不如我们从它下手好了,看看有没有可能给它“减个肥、瘦个身”。重新审视函数extractFields的类型之后,我们不难发现,这个函数从头到尾无非是从Seq[Row]到Seq[(String, Int)]的转换,函数体的核心逻辑就是字段提取,只要从Seq[Row]可以得到Seq[(String, Int)],目的就达到了。

要达成这两种数据类型之间的转换,除了利用上面这种开发者信手拈来的过程式编程,我们还可以用函数式的编程范式。函数式编程的原则之一就是尽可能地在函数体中避免副作用(Side effect),副作用指的是函数对于状态的修改和变更,比如上例中extractFields函数对于fields变量不停地执行追加操作就属于副作用。

基于这个想法,我们就有了第二种实现方式,如下所示。与第一种实现相比,它最大的区别在于去掉了fields变量。之后,为了达到同样的效果,我们在输入参数Seq[Row]上直接调用map操作逐一地提取特定字段并构建元组,最后通过toSeq将映射转换为序列,干净利落,一气呵成。

//实现方案2 —— 正例
val extractFields: Seq[Row] => Seq[(String, Int)] = {
  (rows: Seq[Row]) => 
    rows.map(row => (row.getString(2), row.getInt(4))).toSeq
}

你可能会问:“两份代码实现无非是差了个中间变量而已,能有多大差别呢?看上去不过是代码更简洁了而已。”事实上,我基于第二份代码把ETL作业推上线后,就惊奇地发现端到端执行性能提升了一倍!从原来的两个小时缩短到一个小时。两份功能完全一样的代码,在分布式环境中的执行性能竟然有着成倍的差别。因此你看,在日常的开发工作中,仅仅专注于业务功能实现还是不够的,任何一个可以进行调优的小环节咱们都不能放过。

开发案例2:数据过滤与数据聚合

你也许会说:“你这个例子只是个例吧?更何况,这个例子里的优化,仅仅是编程范式的调整,看上去和Spark似乎也没什么关系啊!”不要紧,我们再来看第二个例子。第二个例子会稍微复杂一些,我们先来把业务需求和数据关系交代清楚。

/**
(startDate, endDate)
e.g. ("2021-01-01", "2021-01-31")
*/
val pairDF: DataFrame = _
 
/**
(dim1, dim2, dim3, eventDate, value)
e.g. ("X", "Y", "Z", "2021-01-15", 12)
*/
val factDF: DataFrame = _
 
// Storage root path
val rootPath: String = _ 

在这个案例中,我们有两份数据,分别是pairDF和factDF,数据类型都是DataFrame。第一份数据pairDF的Schema包含两个字段,分别是开始日期和结束日期。第二份数据的字段较多,不过最主要的字段就两个,一个是Event date事件日期,另一个是业务关心的统计量,取名为Value。其他维度如dim1、dim2、dim3主要用于数据分组,具体含义并不重要。从数据量来看,pairDF的数据量很小,大概几百条记录,factDF数据量很大,有上千万行。

对于这两份数据来说,具体的业务需求可以拆成3步:

  1. 对于pairDF中的每一组时间对,从factDF中过滤出Event date落在其间的数据条目;
  2. 从dim1、dim2、dim3和Event date 4个维度对factDF分组,再对业务统计量Value进行汇总;
  3. 将最终的统计结果落盘到Amazon S3。

针对这样的业务需求,不少同学按照上面的步骤按部就班地进行了如下的实现。接下来,我就结合具体的代码来和你说说其中的计算逻辑。

//实现方案1 —— 反例
def createInstance(factDF: DataFrame, startDate: String, endDate: String): DataFrame = {
val instanceDF = factDF
.filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate))
.groupBy("dim1", "dim2", "dim3", "event_date")
.agg(sum("value") as "sum_value")
instanceDF
}
 
pairDF.collect.foreach{
case (startDate: String, endDate: String) =>
val instance = createInstance(factDF, startDate, endDate)
val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}"
instance.write.parquet(outPath)
} 

首先,他们是以factDF、开始时间和结束时间为形参定义createInstance函数。在函数体中,先根据Event date对factDF进行过滤,然后从4个维度分组汇总统计量,最后将汇总结果返回。定义完createInstance函数之后,收集pairDF到Driver端并逐条遍历每一个时间对,然后以factDF、开始时间、结束时间为实参调用createInstance函数,来获取满足过滤要求的汇总结果。最后,以Parquet的形式将结果落盘。

同样地,这段代码从功能的角度来说没有任何问题,而且从线上的结果来看,数据的处理逻辑也完全符合预期。不过,端到端的执行性能可以说是惨不忍睹,在16台机型为C5.4xlarge AWS EC2的分布式运行环境中,基于上面这份代码的ETL作业花费了半个小时才执行完毕。

没有对比就没有伤害,在同一份数据集之上,采用下面的第二种实现方式,仅用2台同样机型的EC2就能让ETL作业在15分钟以内完成端到端的计算任务。两份代码的业务功能和计算逻辑完全一致,执行性能却差了十万八千里

//实现方案2 —— 正例
val instances = factDF
.join(pairDF, factDF("eventDate") > pairDF("startDate") && factDF("eventDate") <= pairDF("endDate"))
.groupBy("dim1", "dim2", "dim3", "eventDate", "startDate", "endDate")
.agg(sum("value") as "sum_value")
 
instances.write.partitionBy("endDate", "startDate").parquet(rootPath)

那么问题来了,这两份代码到底差在哪里,是什么导致它们的执行性能差别如此之大。我们不妨先来回顾第一种实现方式,嗅一嗅这里面有哪些不好的代码味道。

我们都知道,触发Spark延迟计算的Actions算子主要有两类:一类是将分布式计算结果直接落盘的操作,如DataFrame的write、RDD的saveAsTextFile等;另一类是将分布式结果收集到Driver端的操作,如first、take、collect。

显然,对于第二类算子来说,Driver有可能形成单点瓶颈,尤其是用collect算子去全量收集较大的结果集时,更容易出现性能问题。因此,在第一种实现方式中,我们很容易就能嗅到collect这里的调用,味道很差。

尽管collect这里味道不好,但在我们的场景里,pairDF毕竟是一份很小的数据集,才几百条数据记录而已,全量搜集到Driver端也不是什么大问题。

最要命的是collect后面的foreach。要知道,factDF是一份庞大的分布式数据集,尽管createInstance的逻辑仅仅是对factDF进行过滤、汇总并落盘,但是createInstance函数在foreach中会被调用几百次,pairDF中有多少个时间对,createInstance就会被调用多少次。对于Spark中的DAG来说,在没有缓存的情况下,每一次Action的触发都会导致整条DAG从头到尾重新执行。

明白了这一点之后,我们再来仔细观察这份代码,你品、你细品,目不转睛地盯着foreach和createInstance中的factDF,你会惊讶地发现:有着上千万行数据的factDF被反复扫描了几百次!而且,是全量扫描哟!吓不吓人?可不可怕?这么分析下来,ETL作业端到端执行效率低下的始作俑者,是不是就暴露无遗了?

反观第二份代码,factDF和pairDF用pairDF.startDate < factDF.eventDate <= pairDF.endDate的不等式条件进行数据关联。在Spark中,不等式Join的实现方式是Nested Loop Join。尽管Nested Loop Join是所有Join实现方式(Merge Join,Hash Join,Broadcast Join等)中性能最差的一种,而且这种Join方式没有任何优化空间,但factDF与pairDF的数据关联只需要扫描一次全量数据,仅这一项优势在执行效率上就可以吊打第一份代码实现。

小结

今天,我们分析了两个案例,这两个案例都来自数据应用的ETL场景。第一个案例讲的是,在函数被频繁调用的情况下,函数里面一个简单变量所引入的性能开销被成倍地放大。第二个例子讲的是,不恰当的实现方式导致海量数据被反复地扫描成百上千次。

通过对这两个案例进行分析和探讨,我们发现,对于Spark的应用开发,绝不仅仅是完成业务功能实现就高枕无忧了。Spark天生的执行效率再高,也需要你针对具体的应用场景和运行环境进行性能调优

而性能调优的收益显而易见:一来可以节约成本,尤其是按需付费的云上成本,更短的执行时间意味着更少的花销;二来可以提升开发的迭代效率,尤其是对于从事数据分析、数据科学、机器学习的同学来说,更高的执行效率可以更快地获取数据洞察,更快地找到模型收敛的最优解。因此你看,性能调优不是一件锦上添花的事情,而是开发者必须要掌握的一项傍身技能。

那么,对于Spark的性能调优,你准备好了吗?生活不止眼前的苟且,让我们来一场说走就走的性能调优之旅吧。来吧!快上车!扶稳坐好,系好安全带,咱们准备发车了!

每日一练

  1. 日常工作中,你还遇到过哪些功能实现一致、但性能大相径庭的案例吗?
  2. 我们今天讲的第二个案例中的正例代码,你觉得还有可能进一步优化吗?

期待在留言区看到你分享,也欢迎把你对开发案例的思考写下来,我们下节课见!

精选留言

  • Will

    2021-03-15 19:05:38

    第二个例子,可以利用map join,让小数据分发到每个worker上,这样不用shuffle数据
    作者回复

    没错,Broadcast joins可以进一步提升性能。

    2021-03-16 13:18:00

  • Fendora范东_

    2021-03-31 12:42:41

    请问磊哥,spark里面nested loop join和cartesian product jion有什么区别?
    作者回复

    nlj是一种join实现方式哈,和hash join、sort merge join一样,是一种join实现机制。cartesian join是一种join形式,和inner、left、right对等。每一种join形式,都可以用多种实现机制来做、来实现~

    2021-03-31 19:26:02

  • 西南偏北

    2021-05-01 12:18:27

    1. 其实有很多,比如用foreach算子将数据写入到外部数据库,导致每条数据的写入都会建立连接,另外单条写入也比批量写入的性能差很多。建议使用foreachPartition(),每个分区建立一个连接,同时可以批量写入,性能会好很多。
    2. 一般来讲,小表与大表的关联操作,首先要考虑Broadcast Join


    另外,关于Nested Loop Join的原理:https://www.geeksforgeeks.org/join-algorithms-in-database/amp/
    作者回复

    赞👍,满分💯,无可挑剔~

    2021-05-01 17:18:34

  • TaoInsight

    2021-03-17 09:12:34

    如果pai rDF的startDate和endDate范围有限,可以把日期范围展开,将非等值join转成等值join
    作者回复

    这块能展开说说吗?具体怎么转化为等值join?可以举个例子哈~

    2021-03-17 13:24:14

  • 慢慢卢

    2021-06-17 10:23:24

    老师,我把第二个例子自己试了一遍,有个问题不理解:两个df都只有一条数据,sparkui上第二个stage有200个task,为什么shuffle之后的stage的task有200个,虽然说shuffle之后reduce默认并行度是200,但我只有一条数据,实际上只需要一个task啊,其他的task是怎么产生的?
    作者回复

    空task~ 其实这个好理解,比如你对原始数据集做过滤,原来的数据有1万条,过滤之后1条,但是filter会继承之前的partitioner,也就是分区和之前是一样的,但其实很多分区中都没有数据,也就是空task。你这个例子也一样,并行度是200,实际上就是reducer的partitioner会划分出200的分区,partitioner是固定的,分区是一定要划出来的,但是实际上数据只有一条,其他的都是空task,白白浪费调度资源。

    这也是为什么Coalesce之后,要做filter,以及为什么AQE要做自动分区合并,道理其实都一样,都是为了避免空task白白浪费宝贵的CPU资源。

    2021-06-18 17:48:54

  • Elon

    2021-03-22 21:28:28

    函数式的副作用指的是不修改入参吧?在函数内部是可以定义变量、修改变量的。因此fields变量在函数内部,应该不算副作用吧?
    作者回复

    是的,你说的没错。函数的副作用指的是对外部变量、外部环境的影响,内部状态的改变和转换不算。文中这块表述的不严谨哈,这里主要是想强调可变变量fields带来的计算开销。

    2021-03-23 07:34:43

  • 葛聂

    2021-03-16 08:37:58

    Case 1为什么性能差一倍呢
    作者回复

    好问题,其实改动非常小,开销相比正例也不大,但这里的关键在于,这个函数会被反反复复调用上百次,累积下来,开销就上去了。所以,关键不在于点小不小,而是这个点,是不是瓶颈。

    2021-03-16 13:13:16

  • fsc2016

    2021-03-18 11:37:23

    请问老师,这个课程需要哪些基础,我平时使用过pysaprk 做过一些机器学习相关数据处理练习,对于我这种使用spark不多的,可以消化吸收嘛
    作者回复

    可以,没问题,接触过Spark就行。放心吧,原理部分会有大量的生活化类比和故事,尽可能地让你“边玩边学”。另外,咱们有微信群,有问题可以随时探讨~

    2021-03-18 15:05:17

  • 浩然

    2021-10-11 00:00:16

    简单啊。那个时间区间的,罗列出来,广播一下就完事了。从nest loop到hash join的跨越。
    我之前做Oracle优化的,所以第一反应是哈希join,第二反应是不等值到等值。
    作者回复

    精辟!满分💯,一针见血

    2021-10-11 22:19:45

  • 对方正在输入。。。

    2021-03-15 21:00:46

    可以先将pairdf collect到driver,再将数组按照startdate排序,然后再将其广播。然后在factdf.map里面实现一个方法来从广播的数组里面二分查找到eventdate所属的时间对子。最后就可以根据这个时间对子以及其他的维度属性进行分组聚合了
    作者回复

    广播的思路很赞。不过二分查找这里值得商榷哈,咱们目的是过滤出满足条件的event date,然后和其他维度一起、分组聚合。这里关键不在于过滤和查找效率,关键在于大表的重复扫描,只要解决这个核心痛点,性能问题就迎刃而解。

    2021-03-16 13:50:31

  • Geek_92df49

    2021-03-15 19:26:38

    四个维度分组为什么要加上开始时间和结束时间?
    .groupBy("dim1", "dim2", "dim3", "event_date", "startDate", "endDate")
    作者回复

    兄弟我是作者哈,你说的没错,分组只需要前4个字段,但是你看最后,instances.write.partitionBy("endDate", "startDate").parquet(rootPath),需要用开始和结束时间这两个字段去做分区存储,因此,在前一步分组的时候,把这两个字段保留了下来。

    2021-03-16 14:23:29

  • 光羽隼

    2024-05-31 17:53:19

    1、case 2 问题的关键应该是在于计算发生位置在哪里,反例中逻辑计算发生的位置是driver端,pairDF每循环一次都要把factDF拉到driver遍历一遍,一个factDF很大,拉取一遍就是一次IO的开销。正例中如果把pairdf广播到多个executor中,让计算发生在executor中,那么io开销就变小了。即时NLJ不是一个好的join实现机制,但是在次之前优化已经实现了。
    2、TaoInsight:如果pai rDF的startDate和endDate范围有限,可以把日期范围展开,将非等值join转成等值join;
    以文中的例子,如果pairDF的一条记录是(startDate, endDate) = ("2021-01-01", "2021-01-31"),explode成31条,(startDate, endDate, eventDate) = ("2021-01-01", "2021-01-31"), "2021-01-01"),("2021-01-01", "2021-01-31", "2021-01-02"), ... ,("2021-01-01", "2021-01-31", "2021-01-31"), 这样就可以基于eventDate字段进行等值关联。
    数据会有N倍的膨胀,不过只要N不太大,也可以这么搞
    这位大佬的思路很厉害,将pairdf每条数据按照时间范围扩大,pairDF一张表几百条数据,扩大几百倍也就几万几十万的数据,也是可以实现广播的。而且将数据扩大的另一个好处就是避免了NLJ,就可以实现等值i关联,那么1中说的NLJ性能不好的问题也解决了。
  • 刘吉超

    2021-03-24 13:25:26

    我们每天有9T数据,用如下代码做ETL json平铺,花很长时间
    val adArr = ArrayBuffer[Map[String, String]]()
    if (ads != null) {
    val adnum = ads.length
    for (i <- 0 until adnum) {
    val addObj = ads.getJSONObject(i)
    val adMap = THashMap[String, String]()
    addObj.entrySet().foreach(x => adMap += (x.getKey -> (if(x.getValue==null) "" else x.getValue.toString ) ))
    adArr += adMap.toMap
    }
    }
    基于老师讲的,避免副作用,改为如下代码
    import org.apache.flink.streaming.api.scala._
    import scala.collection.JavaConversions._
    val adArr = (0 until ads.size()).map(i => ads.getJSONObject(i).toMap.map(entry => entry._1 -> (if(entry._2==null) "" else entry._2.toString)))
    尝试后没啥效果,希望老师指导一下
    作者回复

    兄弟我是作者哈,第二份代码,我有几个疑问:
    1. 两个import语句的作用是什么?
    2. ads具体是什么内容?是RDD,还是数组,还是什么?
    3. 没有看到哪里定义分布式数据集,所有计算看上去是基于(0 until ads.size())这个List,那么后续所有的计算,map,map里面的toMap,都是在driver计算的,如果你9T数据都在driver计算,那结果。。。
    4. toMap之后,又加了个map,我理解是为了把value中的null替换为空字符串,如果是这样的话,map里面只处理value就好了,不用带着key

    不知道我理解的对不对哈,期待老弟提供更多信息~

    2021-03-24 21:44:17

  • LJK

    2021-03-17 17:25:57

    同一个application如果action多的话一定会影响效率吗?
    作者回复

    不一定的,action个数不是关键,关键是数据的访问效率。关于提升数据访问效率,咱们专栏后面的内容会讲哈~

    2021-03-18 15:02:32

  • LeoJC

    2023-07-24 16:08:09

    晚来的学习,感谢有你!针对第二个例子,如果pairDF数据过大,我有个想法是:把pairDF中时间转成时间戳再排序,然后按照某个阈值大小分桶,最终转换成map数据结构(key表示bucket index,value则记录原始日期段)再广播出去,fackDF每条记录遍历时按照同样的分桶算法拿到bocket index,就可以快速进行关联碰撞了。这样子一来,降低了参与计算的数据量。....不知道这想法中,有没有什么隐藏的性能问题呢?望指导
  • Geek_6bf244

    2023-06-09 19:41:07

    我有个最近有个地方用了nested loop join,跑得有点慢,不用的话sql会写的非常长,真的一点优化空间没有吗
  • 阿奎

    2022-10-27 09:27:54

    老师您好,我最近遇到一个奇怪的问题。我直接登录终端去执行spark submit任务可以执行成功,我在项目里是模仿的小海豚调度器在程序里去拼接的spark submit脚本然后生成的shell脚本,通过sudo -u hadoop集群的租户 sh command去执行的,这种方式提交的任务如果任务数据量小可以执行成功,任务数据量大就会失败,提示applicationmaster 接受到了kill的信号
  • 唐方刚

    2022-08-24 16:35:54

    弱弱的问个问题,val extractFields: Seq[Row] => Seq[(String, Int)] = {},一般定义方法,冒号后面不是返回值么,怎么这里冒号后面参数和返回值都有
  • 组织灵魂 王子健

    2022-06-20 22:44:11

    看到一个数据量大另一个数据量小,立刻马上就会想到广播变量。还有优化部分write用到了partitionBy这个细节必须注意
  • Geek_32772e

    2022-05-09 00:53:17

    这两个例子直接用sparkSQL不是更简单吗