09 | 调优一筹莫展,配置项速查手册让你事半功倍!(上)

你好,我是吴磊。

对于Spark性能调优来说,应用开发和配置项设置是两个最主要也最常用的入口。但在日常的调优工作中,每当我们需要从配置项入手寻找调优思路的时候,一打开Spark官网的Configuration页面,映入眼帘的就是上百个配置项。它们有的需要设置True或False,有的需要给定明确的数值才能使用。这难免让我们蒙头转向、无所适从。

所以我经常在想,如果能有一份Spark配置项手册,上面分门别类地记录着与性能调优息息相关的配置项就好了,肯定能省去不少麻烦。

那么,接下来的两讲,我们就来一起汇总这份手册。这份手册可以让你在寻找调优思路的时候,迅速地定位可能会用到的配置项,不仅有章可循,还能不丢不漏,真正做到事半功倍!

配置项的分类

事实上,能够显著影响执行性能的配置项屈指可数,更何况在Spark分布式计算环境中,计算负载主要由Executors承担,Driver主要负责分布式调度,调优空间有限,因此对Driver端的配置项我们不作考虑,我们要汇总的配置项都围绕Executors展开。那么,结合过往的实践经验,以及对官网全量配置项的梳理,我把它们划分为3类,分别是硬件资源类、Shuffle类和Spark SQL大类。

为什么这么划分呢?我们一一来说。

首先,硬件资源类包含的是与CPU、内存、磁盘有关的配置项。我们说过,调优的切入点是瓶颈,定位瓶颈的有效方法之一,就是从硬件的角度出发,观察某一类硬件资源的负载与消耗,是否远超其他类型的硬件,而且调优的过程收敛于所有硬件资源平衡、无瓶颈的状态,所以掌握资源类配置项就至关重要了。这类配置项设置得是否得当,决定了应用能否打破瓶颈,来平衡不同硬件的资源利用率。

其次,Shuffle类是专门针对Shuffle操作的。在绝大多数场景下,Shuffle都是性能瓶颈。因此,我们需要专门汇总这些会影响Shuffle计算过程的配置项。同时,Shuffle的调优难度也最高,汇总Shuffle配置项能帮我们在调优的过程中锁定搜索范围,充分节省时间。

最后,Spark SQL早已演化为新一代的底层优化引擎。无论是在Streaming、Mllib、Graph等子框架中,还是在PySpark中,只要你使用DataFrame API,Spark在运行时都会使用Spark SQL做统一优化。因此,我们需要梳理出一类配置项,去充分利用Spark SQL的先天性能优势。

我们一再强调硬件资源的平衡才是性能调优的关键,所以今天这一讲,我们就先从硬件资源类入手,去汇总应该设置的配置项。在这个过程中,我会带你搞清楚这些配置项的定义与作用是什么,以及它们的设置能解决哪些问题,让你为资源平衡打下基础。下一讲,我们再来讲Shuffle类和Spark SQL大类。

哪些配置项与CPU设置有关?

首先,我们先来说说与CPU有关的配置项,主要包括spark.cores.max、spark.executor.cores和spark.task.cpus这三个参数。它们分别从集群、Executor和计算任务这三个不同的粒度,指定了用于计算的CPU个数。开发者通过它们就可以明确有多少CPU资源被划拨给Spark用于分布式计算。

为了充分利用划拨给Spark集群的每一颗CPU,准确地说是每一个CPU核(CPU Core),你需要设置与之匹配的并行度,并行度用spark.default.parallelism和spark.sql.shuffle.partitions这两个参数设置。对于没有明确分区规则的RDD来说,我们用spark.default.parallelism定义其并行度,spark.sql.shuffle.partitions则用于明确指定数据关联或聚合操作中Reduce端的分区数量。

说到并行度(Parallelism)就不得不提并行计算任务(Paralleled Tasks)了,这两个概念关联紧密但含义大相径庭,有不少同学经常把它们弄混。

并行度指的是分布式数据集被划分为多少份,从而用于分布式计算。换句话说,并行度的出发点是数据,它明确了数据划分的粒度。并行度越高,数据的粒度越细,数据分片越多,数据越分散。由此可见,像分区数量、分片数量、Partitions这些概念都是并行度的同义词。

并行计算任务则不同,它指的是在任一时刻整个集群能够同时计算的任务数量。换句话说,它的出发点是计算任务、是CPU,由与CPU有关的三个参数共同决定。具体说来,Executor中并行计算任务数的上限是spark.executor.cores与spark.task.cpus的商,暂且记为#Executor-tasks,整个集群的并行计算任务数自然就是#Executor-tasks乘以集群内Executors的数量,记为#Executors。因此,最终的数值是:#Executor-tasks * #Executors。

我们不难发现,并行度决定了数据粒度,数据粒度决定了分区大小,分区大小则决定着每个计算任务的内存消耗。在同一个Executor中,多个同时运行的计算任务“基本上”是平均瓜分可用内存的,每个计算任务能获取到的内存空间是有上限的,因此并行计算任务数会反过来制约并行度的设置。你看,这两个家伙还真是一对相爱相杀的冤家!

至于,到底该怎么平衡并行度与并行计算任务两者之间的关系,我们留到后面的课程去展开。这里,咱们只要记住和CPU设置有关配置项的含义、区别与作用就行了。

哪些配置项与内存设置有关?

说完CPU,咱们接着说说与内存管理有关的配置项。我们知道,在管理模式上,Spark分为堆内内存与堆外内存。

堆外内存又分为两个区域,Execution Memory和Storage Memory。要想要启用堆外内存,我们得先把参数spark.memory.offHeap.enabled置为true,然后用spark.memory.offHeap.size指定堆外内存大小。堆内内存也分了四个区域,也就是Reserved Memory、User Memory、Execution Memory和Storage Memory。

内存的基础配置项主要有5个,它们的含义如下表所示:

简单来说,这些配置项决定了我们刚才说的这些区域的大小,这很好理解。工具有了,但很多同学在真正设置内存区域大小的时候还会有各种各样的疑惑,比如说:

  • 内存空间是有限的,该把多少内存划分给堆内,又该把多少内存留给堆外呢?
  • 在堆内内存里,该怎么平衡User Memory和Spark用于计算的内存空间?
  • 在统一内存管理模式下,该如何平衡Execution Memory和Storage Memory?

别着急,接下来,咱们一个一个来解决。

堆外与堆内的平衡

相比JVM堆内内存,off heap堆外内存有很多优势,如更精确的内存占用统计和不需要垃圾回收机制,以及不需要序列化与反序列化。你可能会说:“既然堆外内存这么厉害,那我们干脆把所有内存都划分给它不就得了?”先别急着下结论,我们先一起来看一个例子。

用户表1记录着用户数据,每个数据条目包含4个字段,整型的用户ID、String类型的姓名、整型的年龄和Char类型的性别。如果现在要求你用字节数组来存储每一条用户记录,你该怎么办呢?

我们一起来做一下。首先,除姓名外其它3个字段都是定长数据类型,因此可以直接安插到字节数组中。对于变长数据类型如String,由于我们事先并不知道每个用户的名字到底有多长,因此,为了把name字段也用字节数组的形式存储,我们只能曲线救国:先记录name字段的在整个字节数组内的偏移量,再记录它的长度,最后把完整的name字符串安插在字节数组的末尾,如下图所示。

尽管存储String类型的name字段麻烦一些,但我们总算成功地用字节数组容纳了每一条用户记录。OK,大功告成!

你可能会问:“做这个小实验的目的是啥呢?”事实上,Spark开辟的堆外内存就是以这样的方式来存储应用数据的。正是基于这种紧凑的二进制格式,相比JVM堆内内存,Spark通过Java Unsafe API在堆外内存中的管理,才会有那么多的优势。

不过,成也萧何败也萧何,字节数组自身的局限性也很难突破。比如说,如果用户表1新增了兴趣列表字段,类型为List[String],如用户表2所示。这个时候,如果我们仍然采用字节数据的方式来存储每一条用户记录,不仅越来越多的指针和偏移地址会让字段的访问效率大打折扣,而且,指针越多,内存泄漏的风险越大,数据访问的稳定性就值得担忧了。

因此,当数据模式(Data Schema)开始变得复杂时,Spark直接管理堆外内存的成本将会非常高。

那么,针对有限的内存资源,我们该如何平衡JVM堆内内存与off heap堆外内存的划分,我想你心中也该有了答案。对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存。相反地,如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用JVM堆内内存会更加稳妥。

User Memory与Spark可用内存如何分配?

接下来,我们再来说说User Memory。我们都知道,参数spark.memory.fraction的作用是明确Spark可支配内存占比,换句话说,就是在所有的堆内空间中,有多大比例的内存可供Spark消耗。相应地,1 - spark.memory.fraction就是User Memory在堆内空间的占比。

因此,spark.memory.fraction参数决定着两者如何瓜分堆内内存,它的系数越大,Spark可支配的内存越多,User Memory区域的占比自然越小。spark.memory.fraction的默认值是0.6,也就是JVM堆内空间的60%会划拨给Spark支配,剩下的40%划拨给User Memory。

那么,User Memory都用来存啥呀?需要预留那么大的空间吗?简单来说,User Memory存储的主要是开发者自定义的数据结构,这些数据结构往往用来协助分布式数据集的处理。

举个例子,还记得调度系统那一讲Label Encoding的例子吗?

/**
实现方式2
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
 
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
 
//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")

在这个例子中,我们先读取包含用户兴趣的模板文件,然后根据模板内容构建兴趣到索引的映射字典。在对千亿样本做Lable Encoding的时候,这个字典可以快速查找兴趣字符串,并返回对应索引,来辅助完成数据处理。像这样的映射字典就是所谓的自定义数据结构,这部分数据都存储在User Memory内存区域。

因此,当在JVM内平衡Spark可用内存和User Memory时,你需要考虑你的应用中类似的自定义数据结构多不多、占比大不大?然后再相应地调整两块内存区域的相对占比。如果应用中自定义的数据结构很少,不妨把spark.memory.fraction配置项调高,让Spark可以享用更多的内存空间,用于分布式计算和缓存分布式数据集。

Execution Memory该如何与Storage Memory平衡?

最后,咱们再来说说,Execution Memory与Storage Memory的平衡。在内存管理那一讲,我给你讲了一个黄四郎地主招租的故事,并用故事中的占地协议类比了执行内存与缓存内存之间的竞争关系。执行任务与RDD缓存共享Spark可支配内存,但是,执行任务在抢占方面有更高的优先级。

因此通常来说,在统一内存管理模式下,spark.memory.storageFraction的设置就显得没那么紧要,因为无论这个参数设置多大,执行任务还是有机会抢占缓存内存,而且一旦完成抢占,就必须要等到任务执行结束才会释放。

不过,凡事都没有绝对,如果你的应用类型是“缓存密集型”,如机器学习训练任务,就很有必要通过调节这个参数来保障数据的全量缓存。这类计算任务往往需要反复遍历同一份分布式数据集,数据缓存与否对任务的执行效率起着决定性作用。这个时候,我们就可以把参数spark.memory.storageFraction调高,然后有意识地在应用的最开始把缓存灌满,再基于缓存数据去实现计算部分的业务逻辑。

但在这个过程中,你要特别注意RDD缓存与执行效率之间的平衡。为什么这么说呢?

首先,RDD缓存占用的内存空间多了,Spark用于执行分布式计算任务的内存空间自然就变少了,而且数据分析场景中常见的关联、排序和聚合等操作都会消耗执行内存,这部分内存空间变少,自然会影响到这类计算的执行效率。

其次,大量缓存引入的GC(Garbage Collection,垃圾回收)负担对执行效率来说是个巨大的隐患。

你还记得黄四郎要招租的土地分为托管田和自管田吗?托管田由黄四郎派人专门打理土地秋收后的翻土、整平等杂务,为来年种下一茬庄稼做准备。堆内内存的垃圾回收也是一个道理,JVM大体上把Heap堆内内存分为年轻代和老年代。年轻代存储生命周期较短、引用次数较低的对象;老年代则存储生命周期较长、引用次数高的对象。因此,像RDD cache这种一直缓存在内存里的数据,一定会被JVM安排到老年代。

年轻代的垃圾回收工作称为Young GC,老年代的垃圾回收称为Full GC。每当老年代可用内存不足时,都会触发JVM执行Full GC。在Full GC阶段,JVM会抢占应用程序执行线程,强行征用计算节点中所有的CPU线程,也就是“集中力量办大事”。当所有CPU线程都被拿去做垃圾回收工作的时候,应用程序的执行只能暂时搁置。只有等Full GC完事之后,把CPU线程释放出来,应用程序才能继续执行。这种Full GC征用CPU线程导致应用暂停的现象叫做“Stop the world”。

因此,Full GC对于应用程序的伤害远大于Young GC,并且GC的效率与对象个数成反比,对象个数越多,GC效率越差。这个时候,对于RDD这种缓存在老年代中的数据,就很容易引入Full GC问题。

一般来说,为了提升RDD cache访问效率,很多同学都会采用以对象值的方式把数据缓存到内存,因为对象值的存储方式避免了数据存取过程中序列化与反序列化的计算开销。我们在RDD/DataFrame/Dataset之上调用cache方法的时候,默认采用的就是这种存储方式。

但是,采用对象值的方式缓存数据,不论是RDD,还是DataFrame、Dataset,每条数据样本都会构成一个对象,要么是开发者自定义的Case class,要么是Row对象。换句话说,老年代存储的对象个数基本等于你的样本数。因此,当你的样本数大到一定规模的时候,你就需要考虑大量的RDD cache可能会引入的Full GC问题了。

基于上面的分析,我们不难发现,在打算把大面积的内存空间用于RDD cache之前,你需要衡量这么做可能会对执行效率产生的影响

你可能会说:“我的应用就是缓存密集型,确实需要把数据缓存起来,有什么办法来平衡执行效率吗?”办法还是有的。

首先,你可以放弃对象值的缓存方式,改用序列化的缓存方式,序列化会把多个对象转换成一个字节数组。这样,对象个数的问题就得到了初步缓解。

其次,我们可以调节spark.rdd.compress这个参数。RDD缓存默认是不压缩的,启用压缩之后,缓存的存储效率会大幅提升,有效节省缓存内存的占用,从而把更多的内存空间留给分布式任务执行。

通过这两类调整,开发者在享用RDD数据访问效率的同时,还能够有效地兼顾应用的整体执行效率,可谓是两全其美。不过,有得必有失,尽管这两类调整优化了内存的使用效率,但都是以引入额外的计算开销、牺牲CPU为代价的。这也就是我们一直强调的:性能调优的过程本质上就是不断地平衡不同硬件资源消耗的过程。

哪些配置项与磁盘设置有关?

在存储系统那一讲,我们简单提到过spark.local.dir这个配置项,这个参数允许开发者设置磁盘目录,该目录用于存储RDD cache落盘数据块和Shuffle中间文件。

通常情况下,spark.local.dir会配置到本地磁盘中容量比较宽裕的文件系统,毕竟这个目录下会存储大量的临时文件,我们需要足够的存储容量来保证分布式任务计算的稳定性。不过,如果你的经费比较充裕,有条件在计算节点中配备足量的SSD存储,甚至是更多的内存资源,完全可以把SSD上的文件系统目录,或是内存文件系统添加到spark.local.dir配置项中去,从而提供更好的I/O性能。

小结

掌握硬件资源类的配置项是我们打破性能瓶颈,以及平衡不同硬件资源利用率的必杀技。具体来说,我们可以分成两步走。

第一步,理清CPU、内存和磁盘这三个方面的性能配置项都有什么,以及它们的含义。因此,我把硬件资源类配置项的含义都汇总在了一个表格中,方便你随时查看。有了这份手册,在针对硬件资源进行配置项调优时,你就能够做到不重不漏。

第二步,重点理解这些配置项的作用,以及可以解决的问题。

首先,对于CPU类配置项,我们要重点理解并行度与并行计算任务数的区别。并行度从数据的角度出发,明确了数据划分的粒度,并行度越高,数据粒度越细,数据越分散,CPU资源利用越充分,但同时要提防数据粒度过细导致的调度系统开销。

并行计算任务数则不同,它从计算的角度出发,强调了分布式集群在任一时刻并行处理的能力和容量。并行度与并行计算任务数之间互相影响、相互制约。

其次,对于内存类配置项,我们要知道怎么设置它们来平衡不同内存区域的方法。这里我们主要搞清楚3个问题就可以了:

  1. 在平衡堆外与堆内内存的时候,我们要重点考察数据模式。如果数据模式比较扁平,而且定长字段较多,应该更多地使用堆外内存。相反地,如果数据模式比较复杂,应该更多地利用堆内内存
  2. 在平衡可支配内存和User memory的时候,我们要重点考察应用中自定义的数据结构。如果数据结构较多,应该保留足够的User memory空间。相反地,如果数据结构较少,应该让Spark享有更多的可用内存资源
  3. 在平衡Execution memory与Storage memory的时候,如果RDD缓存是刚需,我们就把spark.memory.storageFraction调大,并且在应用中优先把缓存灌满,再把计算逻辑应用在缓存数据之上。除此之外,我们还可以同时调整spark.rdd.compress和spark.memory.storageFraction来缓和Full GC的冲击

每日一练

  1. 并行度设置过大会带来哪些弊端?
  2. 在Shuffle的计算过程中,有哪些Spark内置的数据结构可以充分利用堆外内存资源?
  3. 堆外与堆内的取舍,你还能想到其他的制约因素吗?
  4. 如果内存资源足够丰富,有哪些方式可以开辟内存文件系统,用于配置spark.local.dir参数?

期待在留言区看到你的思考和答案,也欢迎你把这份硬件资源配置项手册分享给更多的朋友,我们下一讲见!

精选留言

  • Joe

    2021-04-29 15:58:20

    老师,针对spark.sql.shuffle.partitions的使用有一些疑问?

    1. 如果df1.join(df2),df1用的是hash partitioner并且分区数是3,这种情况在reduce端参数spark.sql.shuffle.partitions会生效吗?还是以df1的分区为准?

    2. 这个参数是针对所有的Dataframe,DataSet和SQL,还是只有SQL生效?

    作者回复

    先来回答第二个问题,这个参数对所有的DF、DS、SQL都生效。

    再来说第一个问题,例子蛮好的。先说答案,然后咱们再去分析:对于你这种情况,两张表都会Shuffle。当然,假设你的表都很大,超过广播阈值,能转换成Broadcast join那就另当别论了。

    为什么呢?spark.sql.shuffle.partitions默认是200。 这部分其实需要一些“前置引用”的知识,这些知识其实是在22讲Physical Planning才会涉及,所以在这一讲这里会比较难理解。

    是这样的,Spark SQL执行计划中的每一个节点,都有4个重要的属性,分别是:
    节点要求的输入:
    1)requiredChildDistribution
    2)requiredChildOrdering
    节点的输出:
    3)outputPartitioning
    4)outputOrdering

    每个节点都有输出的分区情况、排序情况,也就是3)、4);同时,每个节点对于自己的子节点,都有关于 分区和排序的要求,也就是1)、2)。

    当子节点的分区与排序情况,不满足当前节点的输入要求时,Spark SQL就会在Physical planning阶段,强行插入一些中间节点,比如Exchange(Shuffle)。

    回到你的问题,join要求子节点(df1、df2)的outputPartitioning是以joinKey为分区键,分成200个分区(因为spark.sql.shuffle.partitions默认值是200)。但是,你的df1,有3个分区;df2未知,因此,Spark SQL会在物理计划阶段给两个子节点,也就是df1、df2强行插入Exchange、也就是Shuffle。这两个Shuffle,会分别把df1、df2变成是有200个分区的分布式数据集。两个Shuffle做完之后,才会计算后面的Join。

    所以,最终的分区,不是df1的3、也不是df2原来的分区,而是spark.sql.shuffle.partitions参数设定的值。

    2021-04-29 19:12:56

  • 斯盖丸

    2021-04-16 15:27:43

    关于spark.sql.shuffle.partitions 老师实际工作中我发现这个参数不管用。比如我把它设成2000,并去读parquet文件,大约几百个文件吧,但我看task数量只有80个,而且还一直在变,有时会更少。网上说spark-sql的并行度不管用,要自己手动repartition,是这样吗?task数量一直在变是因为不断地在做groupBy和join吗?那为什么task数量始终达不到我设的spark.sql.shuffle.partitions = 2000呢?
    作者回复

    这块确实有点坑,只有你的计算中涉及Joins或是聚合,spark.sql.shuffle.partitions,这个参数的设置,才会影响Shuffle Reduce阶段的并行度。如果你的作业没有Joins或是聚合计算,那确实,这个参数设了也是摆设。

    比如你仅仅是读Parquet,然后想通过这个参数调整并行度,确实是徒劳,这个时候,你确实只能自己用repartition或是Coalesce去做重分区。

    2021-04-16 17:18:48

  • 断笔画墨

    2021-04-24 15:46:18

    spark读取oracle表,oracle表结构没有数值类型,大部分都是varchar,数据量上亿,怎么在源端做高并发读取啊
    作者回复

    我理解数据读取效率和字段类型没什么关系,要提高并行计算效率,可以考虑使用API:
    spark.read.jdbc(url, table, predicates, props)

    其中url就是你的Oracle DB
    table是表名
    props是各种属性,比如权限信息,如user,password

    predicates比较关键,是划分数据分区的谓词数组,比如["id between 1 and 100", "id between 101 and 200", ... "id between 901 and 1000"],本质上就是用这些过滤条件把查询分成多个,每个查询的结果都是一个数据分片。这么做的好处,是可以提高Spark的并行度,但是要注意,同时发这么多查询请求给Oracle,要注意Oracle数据库本身的并发处理能力。

    2021-04-25 10:17:17

  • Geek_d794f8

    2021-04-15 09:15:21

    Spark.task.cpus这个参数的设置,我之前理解就是一个cpu核运行一个task。难道还可以0.5个cpu或者多个cpu运行一个task吗?
    作者回复

    好问题,通常来说,这个参数都不需要动,默认就是1。回答你的问题,这个值,不能小于1。那么大于1是什么情况呢?就是你的task本身,是需要多线程操作的,比如一个线程把数据写到HDFS,另外一个线程,通过JDBC同时把数据塞进DBMS,诸如此类。

    Spark.task.cpus这个参数的意思,在于Spark为这种特殊的多线程task提供了一种开放的可能,允许你去设置大于1的cpu core。但允许不代表你一定要这么做哈

    特别注意的是,如果你的task没什么特别,但你还是设置的大于1的数值,那cpu就是白白浪费。spark只会去同时launch (spark.executor.cores / spark.task.cpus)这么多的tasks。

    2021-04-15 19:34:45

  • 快跑

    2021-04-13 09:52:04

    Class Student是存在User Memory? new Student("小明")是存在Executor Memory?
    作者回复

    这个其实取决于你把对象放在哪里。

    分两种情况来看哈~

    1. 如果你用RDD封装这些自定义类型,比如RDD[Student],那么,数据集消耗的是Execution memory。

    2. 相反,如果你是在处理分布式数据集的函数中,new Student来辅助计算过程,那么这个对象,是放在User memory里面的。

    2021-04-13 17:23:14

  • Geek_d794f8

    2021-04-15 09:12:12

    老师是不是可以这样理解,spark-submit提交任务的时候申请的总cores数为10,yarn调度系统会分配10个v-core,如果集群资源充足,实际上一个v-core就是对应一个cpu核,如果资源不够,相当于就不是一对一,此时集群最大的task并行度并不是10,而是并发度为10。
    以上理解对吗?
    作者回复

    并行度和并发度是两个完全不同的概念,一个数据视角——并行度;一个是计算视角——并发度,这个我们本讲应该有过介绍哈~

    另外你说的spark.executor.cores与v-core的对应关系是没问题的。但是cores也好,v-core也罢,物理上不见得对应的是一个物理CPU core,这个要看CPU的硬件配置,有些CPU只能起一个线程,不过大部分现代CPU都能起两个线程。所以spark.executor.cores与v-core,更准确的对应,是线程,而不是物理上的CPU core。这个地方其实有点绕,需要注意。

    2021-04-15 19:40:51

  • 快跑

    2021-04-19 17:59:42

    一个任务报错Container Killed by Yarn For Exceeding Memory Limits Consider boosting spark.yarn.executor.memoryOverhead
    我按照提示增加spark.yarn.executor.memoryOverhead,任务的确执行通过。
    请教老师spark.yarn.executor.memoryOverhead参数控制哪部分内存,主要负责什么?什么情况下会用到这部分内存。文章中关于内存的好像没有提到这部分。
    作者回复

    spark.executor.memoryOverhead ,在yarn、k8s部署模式下,container会预留一部分内存,形式是堆外,用来保证稳定性,主要存储nio buffer,函数栈等一些开销,所以你看名字:over head。这部分内存,他的目的是保持Spark作业运行时的稳定性。

    这个failure,报这个错,说明overhead空间不足,系统必须的开销没有足够的空间~ 调大就行了~ 这部分内存和执行性能关系不大,所以咱们课程里没有提。

    2021-04-20 09:51:15

  • 2021-04-12 09:17:03

    老师,这个自定义结构不是很懂,什么样的数据格式是自定义数据格式,我现在经常接触到的是spark-submit 提交sparksql这一块,用的是hive表,这个涉及自定义数据结构吗
    作者回复

    好问题,你可以这么来理解,凡是数据源,不管是Hive来的,还是HDFS来的,还是S3来的,不管是什么格式,比如Parquet、ORC,这些数据源,consume的都是执行内存,当然,如果他们被cache了,那就消耗Storage memory。

    那些开发者自己自定义的类、类型、数据结构、Struct,等等,这些东西往往用来辅助完成对于刚刚说的那些数据源的处理,这些辅助性的类、类型、数据结构、Struct,才是自定义数据结构。他们消耗的,就是user memory。

    2021-04-12 17:45:39

  • Sean

    2021-08-22 16:54:55

    根据老师的回回复,个人总结如下,不知是否正确:在不使用yarn,k8s模式下,完全没有必要启用off heap,而且在钨丝计划的加持下,可以理解为使用堆内内存,不会对任务有任何影响,但在使用yarn或k8s模式下,必须要开启off heap,否则会出现t Container Killed by Yarn For Exceeding Memory Limits Consider boosting spark.yarn.executor.memoryOverhead报错,需要调大spark.yarn.executor.memoryOverhead
    作者回复

    对的,总结的很到位~

    2021-08-23 15:09:55

  • 西南偏北

    2021-05-04 15:18:24

    并行度太高可能会造成任务调度耗时超过任务处理耗时,如果不进行后续分区合并,还有会造成小文件问题(比如写入Hive)
    作者回复

    满分💯

    2021-05-04 16:36:25

  • Geek_01fccd

    2021-05-11 08:47:40

    在hive on spark模式下,是否可以忽略user memory,设置spark.memory.fraction=0.9,这里应该不涉及用户自定义数据结构吧?
    作者回复

    对,通常来说,这个时候都是用SQL去做数据探索、数据分析,不需要User Memory的。

    不过,并不绝对哈,比如你的SQL需要某个自定义数组来过滤数据条目,这个时候这种自定义数组,会去消耗User Memory。

    2021-05-12 13:39:52

  • 勿更改任何信息

    2021-04-05 11:51:20

    如果我把堆外内存关闭,会不会导致spark sql执行失败?也就是spark sql有没有必须使用堆外内存的场景?
    作者回复

    不会的,用堆内更稳定,而且堆外默认是关闭的。没有必须使用堆外的场景,事实上,由于tungsten的优化,spark官方推荐使用堆内内存,在保证稳定性的同时,还可以大幅提升性能。

    2021-04-05 17:29:25

  • 铜镜

    2021-04-02 11:30:50

    1.资源浪费,并行度设置过大会增加调度开销,CPU利用率会变低
    2.不好意思没看明白这个是在问啥😁, int,char这种?还是dataframe这种?还是AppendOnlyMap这种?
    3.如果需要频繁操作反序列化出来的实际值的话,使用堆外内存会大幅增加cpu开销
    4.alluxio?
    作者回复

    四道题都答的不错~ 第三题,同样的操作,堆内也存在同样的问题。

    2021-04-03 14:27:12

  • 薛峰

    2021-04-10 15:16:31

    醍醐灌顶,真希望是在过去的面试之前看到这篇。
    作者回复

    有帮助就好哈~ 没事,以后面试机会多的是~ 😉

    2021-04-10 19:55:54

  • Geek_73cee2

    2022-06-10 01:52:47

    老师每个计算任务 是指每个stage还是同一个stage的不同分区对应的任务 还是其他的。。。有点学晕了
  • 夏国秦

    2022-04-05 15:42:47

    老师 请教一下 怎样才能够用scala去写spark写的很顺利 有不有好的学习资料推荐
  • Unknown element

    2021-12-10 14:42:46

    老师问下自己定义的dataframe是不是也属于自定义数据结构(从而存储在user memory)?
    作者回复

    不是的哈,DataFrame、RDD、Dataset,这些都是分布式数据集,自定义数据结构,指的是像数组、列表、map这些用户自定义,在分布式数据集当中引用的数据结构

    2021-12-10 23:21:41

  • 快跑

    2021-04-12 21:03:48

    老师你好,
    对于“对于没有明确分区规则的 RDD 来说,我们用 spark.default.parallelism 定义其并行度。”
    没有明确分区规则的 RDD是指什么,有什么场景属于没有分区规则的RDD?
    作者回复

    比如,你用parallelize创建的RDD

    2021-04-13 17:20:23

  • 斯盖丸

    2021-04-02 12:53:58

    这一讲有好多实操中不理解的问题,请老师不吝赐教。

    1 老师多次强调要确保硬件资源的平衡。但我们应该怎么查作业是否平衡呢?比如在Yarn上,我能看到我的job占用的总内存和总v-core(顺便问一下,一个v-core是一个CPU的核吗?),但我应该从哪个界面看出CPU是太空闲还是太忙了?

    2 开启堆外内存,Spark会把复杂的schema放到堆外内存吗?比如我读parquet表生成一个几亿条的DF,对它做了一些collect_list操作,那它会把简单的数据结构放堆里而把复杂的list放到堆外吗,要是反过来怎么办,岂不是更糟?

    3 自己写了一个函数,传入一个sparkSession,返回一个DF,我在外面用个val变量去接,请问在函数返回的最后一步要cache一下吗?因为我觉得这算是两个变量复用一个RDD了,应该要cache()。但我同事说不用cache的,我俩究竟谁对?麻烦老师都回答一下吧~
    作者回复

    先说第一题,spark ui看不到硬件资源消耗,需要结合第三方工具比如ganglia,或者其他的工具。或者你如果可以登陆worker节点,用系统命令也可以。

    第二题,spark其实不会自行判断数据结构复杂与否,堆外与堆内,如果堆外有空闲,他会优先选择堆外。需要注意的是,同一个task,不能脚踏两条船,不能同时用堆外和堆内,假设你最开始选择堆外,跑着跑着发现堆外不够用,这个时候,即使堆内还有空闲,task还是会oom。

    第三题,你同学说的对,你这个操作,不算rdd复用,不需要cache,不要滥用cache哈~ 你需要好好看看前面的rdd和单机思维那几篇🙂

    2021-04-03 14:36:34

  • Geek_197707

    2023-02-05 12:30:16

    老师,我是使用spark sql的, 有2个问题:
    问题1.User Memory:我看你的解释是自定义数据结构,那用的是纯spark sql,基本上读的数据都是Hive表,没有自定义数据结构,是不是可以把spark.memory.fraction调大
    问题2:Tungsten计划提出了使用二进制字节数组来代替JVM的java对象,我在看源码的时候,遇到了PartitionedAppendOnlyMap和ParititonedPairBuffer,他的实现也是数组,这两个是一个东西么