你好,我是吴磊。
对于Spark这样的内存计算引擎来说,内存的管理与利用至关重要。业务应用只有充分利用内存,才能让执行性能达到最优。
那么,你知道Spark是如何使用内存的吗?不同的内存区域之间的关系是什么,它们又是如何划分的?今天这一讲,我就结合一个有趣的小故事,来和你深入探讨一下Spark内存管理的基础知识。
内存的管理模式
在管理方式上,Spark会区分堆内内存(On-heap Memory)和堆外内存(Off-heap Memory)。这里的“堆”指的是JVM Heap,因此堆内内存实际上就是Executor JVM的堆内存;堆外内存指的是通过Java Unsafe API,像C++那样直接从操作系统中申请和释放内存空间。
其中,堆内内存的申请与释放统一由JVM代劳。比如说,Spark需要内存来实例化对象,JVM负责从堆内分配空间并创建对象,然后把对象的引用返回,最后由Spark保存引用,同时记录内存消耗。反过来也是一样,Spark申请删除对象会同时记录可用内存,JVM负责把这样的对象标记为“待删除”,然后再通过垃圾回收(Garbage Collection,GC)机制将对象清除并真正释放内存。

在这样的管理模式下,Spark对内存的释放是有延迟的,因此,当Spark尝试估算当前可用内存时,很有可能会高估堆内的可用内存空间。
堆外内存则不同,Spark通过调用Unsafe的allocateMemory和freeMemory方法直接在操作系统内存中申请、释放内存空间,这听上去是不是和C++管理内存的方式很像呢?这样的内存管理方式自然不再需要垃圾回收机制,也就免去了它带来的频繁扫描和回收引入的性能开销。更重要的是,空间的申请与释放可以精确计算,因此Spark对堆外可用内存的估算会更精确,对内存的利用率也更有把握。
为了帮助你更轻松地理解这个过程,我来给你讲一个小故事。
地主招租(上):土地划分
很久以前,燕山脚下有一个小村庄,村里有个地主,名叫黄四郎,四郎家有良田千顷,方圆数百里都是他的田地。黄四郎养尊处优,自然不会亲自下地种田,不过这么多田地也不能就这么荒着。于是,他想了个办法,既不用亲自动手又能日进斗金:收租子!
黄四郎虽然好吃懒做,但在管理上还是相当有一套的,他把田地划分为两块,一块叫“托管田”,另一块叫“自管田”。
我们知道,庄稼丰收之后,田地需要翻土、整平、晾晒,来年才能种下一茬庄稼。那么,托管田指的就是丰收之后,由黄四郎派专人帮你搞定翻土、整平这些琐事,不用你操心。相应的,自管田的意思就是庄稼你自己种,秋收之后的田地也得你自己收拾。
毫无疑问,对租户来说托管田更省心一些,自管田更麻烦。当然了,相比自管田,托管田的租金自然更高。

那么,这个故事中黄四郎的托管田就是内存管理中的堆内内存,自管田类比的则是堆外内存,田地的翻土、整平这些操作实际上就是JVM中的GC。这样类比起来是不是更好理解了呢?
内存区域的划分
故事先讲到这儿,让我们暂时先回到Spark的内存管理上。现在,我们知道了Spark内存管理有堆内和堆外两种模式,那Spark又是怎么划分内存区域的呢?
我们先来说说堆外内存。Spark把堆外内存划分为两块区域:一块用于执行分布式任务,如Shuffle、Sort和Aggregate等操作,这部分内存叫做Execution Memory;一块用于缓存RDD和广播变量等数据,它被称为Storage Memory。
堆内内存的划分方式和堆外差不多,Spark也会划分出用于执行和缓存的两份内存空间。不仅如此,Spark在堆内还会划分出一片叫做User Memory的内存空间,它用于存储开发者自定义数据结构。

除此之外,Spark在堆内还会预留出一小部分内存空间,叫做Reserved Memory,它被用来存储各种Spark内部对象,例如存储系统中的BlockManager、DiskBlockManager等等。
对于性能调优来说,我们在前三块内存的利用率上有比较大的发挥空间,因为业务应用主要消耗的就是它们,也即Execution memory、Storage memory和User memory。而预留内存我们却动不得,因为这块内存仅服务于Spark内部对象,业务应用不会染指。
好了,不同内存区域的划分与计算,我也把它们总结到了下面的表格中,方便你随时查阅。

执行与缓存内存
在所有的内存区域中,最重要的无疑是缓存内存和执行内存,而内存计算的两层含义也就是数据集缓存和Stage内的流水线计算,对应的就是Storage Memory和Execution Memory。
在Spark 1.6版本之前,Execution Memory和Storage Memory内存区域的空间划分是静态的,一旦空间划分完毕,不同内存区域的用途就固定了。也就是说,即便你没有缓存任何RDD或是广播变量,Storage Memory区域的空闲内存也不能用来执行Shuffle中的映射、排序或聚合等操作,因此宝贵的内存资源就被这么白白地浪费掉了。
考虑到静态内存划分潜在的空间浪费,在1.6版本之后,Spark推出了统一内存管理模式。统一内存管理指的是Execution Memory和Storage Memory之间可以相互转化,尽管两个区域由配置项spark.memory.storageFraction划定了初始大小,但在运行时,结合任务负载的实际情况,Storage Memory区域可能被用于任务执行(如Shuffle),Execution Memory区域也有可能存储RDD缓存。
但是,我们都知道,执行任务相比缓存任务,在内存抢占上有着更高的优先级。那你有没有想过这是为什么呢?接下来,就让我们带着“打破砂锅问到底”的精神,去探索其中更深层次的原因。
首先,执行任务主要分为两类:一类是Shuffle Map阶段的数据转换、映射、排序、聚合、归并等操作;另一类是Shuffle Reduce阶段的数据排序和聚合操作。它们所涉及的数据结构,都需要消耗执行内存。
我们可以先假设,执行任务与缓存任务在内存抢占上遵循“公正、公平和公开”的三原则。也就是说,不论谁抢占了对方的内存,当对方有需要时都会立即释放。比如说,刚开始双方的预设比例是五五开,但因为缓存任务在应用中比较靠后的位置,所以执行任务先占据了80%的内存空间,当缓存任务追赶上来之后,执行任务就需要释放30%的内存空间还给缓存任务。
这种情况下会发生什么?假设集群范围内总共有80个CPU,也就是集群在任意时刻的并行计算能力是80个分布式任务。在抢占了80%内存的情况下,80个CPU可以充分利用,每个CPU的计算负载都是比较饱满的,计算完一个任务,再去计算下一个任务。
但是,由于有30%的内存要归还给缓存任务,这意味着有30个并行的执行任务没有内存可用。也就是说会有30个CPU一直处在I/O wait的状态,没法干活!宝贵的CPU计算资源就这么白白地浪费掉了,简直是暴殄天物。
因此,相比于缓存任务,执行任务的抢占优先级一定要更高。说了这么多,我们为什么要弄清楚其中的原因呢?我认为,只有弄清楚抢占优先级的背后逻辑,我们才能理解为什么要同时调节CPU和内存的相关配置,也才有可能做到不同硬件资源之间的协同与平衡,这也是我们进行性能调优要达到的最终效果。
不过,即使执行任务的抢占优先级更高,但它们在抢占内存的时候一定也要遵循某些规则。那么,这些规则具体是什么呢?下面,咱们就接着以地主招租的故事为例,来说说Execution memory和Storage memory之间有哪些有趣的规则。
地主招租(下):租地协议
黄四郎招租的告示贴出去没多久,村子里就有两个年富力强的小伙子来租种田地。一个叫黄小乙,是黄四郎的远房亲戚,前不久来投奔黄四郎。另一个叫张麻子,虽是八辈贫农,小日子过得也算是蒸蒸日上。张麻子打算把田地租过来种些小麦、玉米这样的庄稼。黄小乙就不这么想,这小子挺有商业头脑,他把田地租过来准备种棉花、咖啡这类经济作物。
两个人摩拳擦掌都想干出一番事业,恨不得把黄四郎的地全都包圆!地不愁租,黄四郎自然是满心欢喜,但烦恼也接踵而至:“既要照顾小乙这孩子,又不能打击麻子的积极性,得想个万全之策”。
于是,他眼珠一转,计上心来:“按理说呢,咱们丈量土地之后,应该在你们中间划一道实线,好区分田地的归属权。不过呢,毕竟麻子你是本村的,小乙远道而来,远来即是客嘛!咱们对小乙还是得多少照顾着点”。张麻子心生不悦:“怎么照顾?”
黄四郎接着说:“很简单,把实线改为虚线,多劳者多得。原本呢,你们应该在分界线划定的那片田地里各自劳作。不过呢,你们二人的进度各不相同嘛,所以,勤奋的人,自己的田地种满了之后,可以跨过分界线,去占用对方还在空着的田地。”
黄小乙不解地问:“四舅,这不是比谁种得快吗?也没对我特殊照顾啊!”张麻子眉间也拧了个疙瘩:“如果种得慢的人后来居上,想要把被占的田地收回去,到时候该怎么办呢?”
黄四郎得意道:“刚才说了,咱们多多照顾小乙。所以如果麻子勤快、干活也快,先占了小乙的地,种上了小麦、玉米,小乙后来居上,想要收回自己的地,那么没说的,麻子得把多占的地让出来。不管庄稼熟没熟,麻子都得把地铲平,还给人家小乙种棉花、咖啡”。

黄四郎偷眼看了看两人的反应,继续说:“反过来,如果小乙更勤快,先占了麻子的地,麻子后来居上,想要收回,这个时候,咱们就得多照顾照顾小乙。小乙有权继续占用麻子的地,直到地上种的棉花、咖啡都丰收了,再把多占的地让出来。你们二位看怎么样?”
黄小乙听了大喜。张麻子虽然心里不爽,但也清楚黄四郎和黄小乙之间的亲戚关系,也不好再多说什么,心想:“反正我勤快些,先把地种满也就是了”。于是,三方击掌为誓,就此达成协议。
好啦,地主招租的故事到这里就讲完了。不难发现,黄小乙的地类比的是Execution Memory,张麻子的地其实就是Storage Memory。他们之间的协议其实就是Execution Memory和Storage Memory之间的抢占规则,一共可以总结为3条:
- 如果对方的内存空间有空闲,双方就都可以抢占;
- 对于RDD缓存任务抢占的执行内存,当执行任务有内存需要时,RDD缓存任务必须立即归还抢占的内存,涉及的RDD缓存数据要么落盘、要么清除;
- 对于分布式计算任务抢占的Storage Memory内存空间,即便RDD缓存任务有收回内存的需要,也要等到任务执行完毕才能释放。
同时,我也把这个例子中的关键内容和Spark之间的对应关系总结在了下面,希望能帮助你加深印象。

从代码看内存消耗
说完了理论,接下来,咱们再从实战出发,用一个小例子来直观地感受一下,应用中代码的不同部分都消耗了哪些内存区域。
示例代码很简单,目的是读取words.csv文件,然后对其中指定的单词进行统计计数。
val dict: List[String] = List(“spark”, “scala”)
val words: RDD[String] = sparkContext.textFile(“~/words.csv”)
val keywords: RDD[String] = words.filter(word => dict.contains(word))
keywords.cache
keywords.count
keywords.map((_, 1)).reduceByKey(_ + _).collect
整个代码片段包含6行代码,咱们从上到下逐一分析。
首先,第一行定义了dict字典,这个字典在Driver端生成,它在后续的RDD调用中会随着任务一起分发到Executor端。第二行读取words.csv文件并生成RDD words。第三行很关键,用dict字典对words进行过滤,此时dict已分发到Executor端,Executor将其存储在堆内存中,用于对words数据分片中的字符串进行过滤。Dict字典属于开发者自定义数据结构,因此,Executor将其存储在User Memory区域。
接着,第四行和第五行用cache和count对keywords RDD进行缓存,以备后续频繁访问,分布式数据集的缓存占用的正是Storage Memory内存区域。在最后一行代码中,我们在keywords上调用reduceByKey对单词分别计数。我们知道,reduceByKey算子会引入Shuffle,而Shuffle过程中所涉及的内部数据结构,如映射、排序、聚合等操作所仰仗的Buffer、Array和HashMap,都会消耗Execution Memory区域中的内存。
不同代码与其消耗的内存区域,我都整理到了下面的表格中,方便你查看。

小结
深入理解内存管理的机制,有助于我们充分利用应用的内存,提升其执行性能。今天,我们重点学习了内存管理的基础知识。
首先是内存的管理方式。Spark区分堆内内存和堆外内存:对于堆外内存来说,Spark通过调用Java Unsafe的allocateMemory和freeMemory方法,直接在操作系统内存中申请、释放内存空间,管理成本较高;对于堆内内存来说,无需Spark亲自操刀而是由JVM代理。但频繁的JVM GC对执行性能来说是一大隐患。另外,Spark对堆内内存占用的预估往往不够精确,高估可用内存往往会为OOM埋下隐患。
其次是统一内存管理,以及Execution Memory和Storage Memory之间的抢占规则。它们就像黄四郎招租故事中黄小乙和张麻子的田地,抢占规则就像他们之间的占地协议,主要可以分为3条:
- 如果对方的内存空间有空闲,那么双方都可以抢占;
- 对RDD缓存任务抢占的执行内存,当执行任务有内存需要时,RDD缓存任务必须立即归还抢占的内存,其中涉及的RDD缓存数据要么落盘、要么清除;
- 对分布式计算任务抢占的Storage Memory内存空间,即便RDD缓存任务有收回内存的需要,也要等到任务执行完毕才能释放。
最后是不同代码对不同内存区域的消耗。内存区域分为Reserved Memory、User Memory、Execution Memory和Storage Memory。其中,Reserved Memory用于存储Spark内部对象,User Memory用于存储用户自定义的数据结构,Execution Memory用于分布式任务执行,而Storage Memory则用来容纳RDD缓存和广播变量。
好了,这些就是内存管理的基础知识。当然了,与内存相关的话题还有很多,比如内存溢出、RDD缓存、内存利用率,以及执行内存的并行计算等等。在性能调优篇,我还会继续从内存视角出发,去和你探讨这些话题。
每日一练
- 你知道启用off-heap之后,Spark有哪些计算环节可以利用到堆外内存?你能列举出一些例子吗?
- 相比堆内内存,为什么在堆外内存中,Spark对于内存占用量的预估更准确?
- 结合我在下面给定的配置参数,你能分别计算不同内存区域(Reserved、User、Execution、Storage)的具体大小吗?

期待在留言区看到你的思考和答案,我们下一讲见!
精选留言
2021-05-12 15:31:44
2021-03-29 06:54:36
堆外内存中:Storage内存为10*0.6=6G,Execution内存为10*0.4=4G
2021-05-25 16:42:51
2021-04-18 21:06:16
1.tungsten中的page用于同一管理off-heap和on-heap,利用这个机制可否在spark runtime的时候shuffle同时使用堆内和堆外内存?
2.在cache rdd的时候是否能指定StorageLevel为off_heap在spark runtime时使用堆外内存,memory_only的情况下使用堆内内存,或者说在配置开启堆外内存的参数之后,所有内存都是走堆外内存,无法使用堆内内存
2021-03-31 15:29:21
开启堆外内存后,分配的内存空间是多大?这时候还会分配堆内内存吗?谢谢
2021-04-15 21:34:44
2021-05-03 22:14:12
缓存rdd:rdd.persist(StorageLevel.OFF_HEAP)
第二题:
因为堆内内存的申请和释放是由JVM来统一管理,对Spark来说是不那么透明可控的;而堆外内存需要调用Unsafe的allocateMemory和freeMemory方法来进行内存的申请和释放,完全由Spark来控制,所以估算会相对更精准。
第三题:
- Reserved:300M
- User:(20GB - 300MB) * (1 - 0.8)
- Execution:(20GB - 300MB) * 0.8 * (1 - 0.6) + 10GB * (1 - 0.6)
- Storage:(20GB - 300MB) * 0.8 * 0.6 + 10GB * 0.6
2022-03-29 22:38:42
1、spark中内存划分是逻辑上的,真正的管理还是在jvm。如user memory占用内存超过设定值,还是会占用框架内存。但框架内存会根据设定值让task做一些阻塞或spill操作,所以从这个层面上说,框架内存的值得正确设置,如用户不会用到大的list、map等内存集合,就要把用户内存空间设置得够小,以保证框架内存(执行内存+存储内存)足够大,避免不必要的阻塞或spill操作?
2、如果开启了堆外内存,即使堆外内存不够,堆内内存充足,task也只会用堆外内存而不会用堆内内存?
3、spark 2.x版本中如果开启了堆外内存,并设置了spark.memory.offHeap.size=500mb,在yarn上跑的话spark.executor.memoryOverhead除了默认需要的10%是否还有要加上这500mb,否则container不会分配堆外这500mb的内存?看网上说3.0以上就不用加了。
4、task会在哪些场景申请和释放内存呢?只是shuffle的场景吗?transformer场景会吗?
2021-08-24 10:33:38
1.在缓存rdd时,既然executor memory 和 storage memory 两块内存不可互相share,那是不是可以通过persist来指定呢,一部分rdd使用execm 一部分rdd使用storm呢?
2.只要不开启off heap,spark就无法使用off heap,包括yarn,k8s模式利用off heap提升稳定性也无法体现出来,一旦开启了off heap,执行任务也就是executor memory优先使用off heap,storage memory还是优先堆内内存,可以这样理解吗?
3.例如:spark executor如果配置了堆内和堆外各4GB,executor cores配置为2。那么该executor运行的第一个task只会使用堆外内存?调度来的第二个task,哪怕堆外剩余几十MB,它也会用堆外内存,如果第二个task发现堆外不够用,就会写磁盘,或清除部分堆外内存数据呢
4.shuffle 阶段的稳定性参数 spark.excludeOnFailure.application.fetchFailure.enabled 从官网描述上来看,这个参数对fetch failed会切换到别的节点,结合实际情况,在Map 阶段:Shuffle writer 按照 Reducer 的分区规则将中间数据写入本地磁盘过程中,刚好写人的datanode 的数据卷故障,但是并没有触发重试机制,而是一直runing状态,是不是可以通过启用application.fetchFailure.enabled来识别,目前使用的是物理机,这种情况也是偶尔发生一次,所以很难验证
2021-04-26 16:52:03
2021-03-29 16:07:02
2021-10-21 00:18:26
2021-11-13 10:44:07
2021-08-29 17:22:04
1.memoryOverhead这个参数不管是作用在堆内还是堆外,都是占用storage memory这部分内存吗,
2.磊哥的回复中提到 "不管堆外还是堆内,开发者用不到,spark也用不到,所以不用关心,千万不指望调这个参数去提升性能,它的目的是保持运行时的稳定性~",个人不太理解这句话的不用关心,因为有出现过oom overhead的问题,可以理解为是使用到了memoryOverhead,那么就需要去调整对应的memoryOverhead大小,"开发者用不到,spark也用不到",这句话我还没有get到,斗胆在问一下磊哥,是哪里用到了这个参数,来提升稳定性呢?
个人理解不够,给磊哥添麻烦了 o(╥﹏╥)o o(╥﹏╥)o o(╥﹏╥)o
2021-03-30 18:10:39
我这样子理解对么?
回答2:
堆内内存:因为spark只是将无用的对象引用删除,但是无用对象真正的回收还要依赖于JVM来管理。Spark只是做了标记,但是真正什么时候删除spark并不知道,这里存在一个时间差。
相比较堆外内存:spark自己做管理就可以清楚的知道当前还有多少内存空间可以使用。
回答3:
堆内:
Reserved: 300MB
User: 4GB
Execution: 6.4GB
Storage: 9.6GB
堆外:
Execution:6GB
Storage:4GB
2021-03-29 18:08:40
2. 堆内内存因为依赖JVM的GC进行内存回收,Spark不能保证内存空间已经被回收;堆外内存因为是手动进行内存的分配和释放,所以Spark能准确预估内存暂用量
3.
1. Reserved Memory: 300 MB
2. User Memory: 20 * 0.2 = 4 GB
3. Execution Memory: 20 * 0.8 * 0.4 + 10 * 0.4 = 10.4 GB
4. Storage Memory: 20 * 0.8 * 0.6 + 10 * 0.6 = 15.6 GB
2023-08-23 16:17:28
2022-08-18 20:01:58
1.executor是否也在A,B,C的机器上分别来执行呢(假设资源足够,我理解是按照上节课的移动代码但是不移动数据来分配)?
2. 对与在A机器上的executor,是只加载三分之一的数据到内存里吗?也就是磁盘块A的数据到内存中,executor是看不到磁盘块B,C的数据的是吗?
3. words.filter(word => dict.contains(word)) 对于这段代码的执行,dict分别在A,B,C三台机器的executor上的user memory存储,这一段的filter底层是for循环进行每个word过滤,是否是A,B,C三台机器各过滤三分之一的数据(例如有30亿个单词,A,B, C分别过滤自己磁盘块的10W条数据),并且并行执行呢?
2022-07-23 23:02:14
2022-07-02 15:11:39
-- 增大reducer一次性read大小
spark.reducer.maxSizeInFlight
-- 增加spill缓冲
set spark.shuffle.file.buffer