17 | 内存视角(三):OOM都是谁的锅?怎么破?

你好,我是吴磊。

无论是批处理、流计算,还是数据分析、机器学习,只要是在Spark作业中,我们总能见到OOM(Out Of Memory,内存溢出)的身影。一旦出现OOM,作业就会中断,应用的业务功能也都无法执行。因此,及时处理OOM问题是我们日常开发中一项非常重要的工作。

但是,Spark报出的OOM问题可以说是五花八门,常常让人找不到头绪。比如,我们经常遇到,数据集按照尺寸估算本该可以完全放进内存,但Spark依然会报OOM异常。这个时候,不少同学都会参考网上的做法,把spark.executor.memory不断地调大、调大、再调大,直到内心崩溃也无济于事,最后只能放弃。

那么,当我们拿到OOM这个“烫手的山芋”的时候该怎么办呢?我们最先应该弄清楚的是“到底哪里出现了OOM”。只有准确定位出现问题的具体区域,我们的调优才能有的放矢。具体来说,这个“哪里”,我们至少要分3个方面去看。

  • 发生OOM的LOC(Line Of Code),也就是代码位置在哪?
  • OOM发生在Driver端,还是在Executor端?
  • 如果是发生在Executor端,OOM到底发生在哪一片内存区域?

定位出错代码的位置非常重要但也非常简单,我们只要利用Stack Trace就能很快找到抛出问题的LOC。因此,更关键的是,我们要明确出问题的到底是Driver端还是Executor端,以及是哪片内存区域。Driver和Executor产生OOM的病灶不同,我们自然需要区别对待。

所以今天这一讲,我们就先来说说Driver端的OOM问题和应对方法。由于内存在Executor端被划分成了不同区域,因此,对于Executor端怪相百出的OOM,我们还要结合案例来分类讨论。最后,我会带你整理出一套应对OOM的“武功秘籍”,让你在面对OOM的时候,能够见招拆招、有的放矢!

Driver端的OOM

我们先来说说Driver端的OOM。Driver的主要职责是任务调度,同时参与非常少量的任务计算,因此Driver的内存配置一般都偏低,也没有更加细分的内存区域。

因为Driver的内存就是囫囵的那么一块,所以Driver端的OOM问题自然不是调度系统的毛病,只可能来自它涉及的计算任务,主要有两类:

  • 创建小规模的分布式数据集:使用parallelize、createDataFrame等API创建数据集
  • 收集计算结果:通过take、show、collect等算子把结果收集到Driver端

因此Driver端的OOM逃不出2类病灶:

  • 创建的数据集超过内存上限
  • 收集的结果集超过内存上限

第一类病灶不言自明,咱们不细说了。看到第二类病灶,想必你第一时间想到的就是万恶的collect。确实,说到OOM就不得不提collect。collect算子会从Executors把全量数据拉回到Driver端,因此,如果结果集尺寸超过Driver内存上限,它自然会报OOM。

由开发者直接调用collect算子而触发的OOM问题其实很好定位,比较难定位的是间接调用collect而导致的OOM。那么,间接调用collect是指什么呢?还记得广播变量的工作原理吗?

广播变量在创建的过程中,需要先把分布在所有Executors的数据分片拉取到Driver端,然后在Driver端构建广播变量,最后Driver端把封装好的广播变量再分发给各个Executors。第一步的数据拉取其实就是用collect实现的。如果Executors中数据分片的总大小超过Driver端内存上限也会报OOM。在日常的调优工作中,你看到的表象和症状可能是:

java.lang.OutOfMemoryError: Not enough memory to build and broadcast

但实际的病理却是Driver端内存受限,没有办法容纳拉取回的结果集。找到了病因,再去应对Driver端的OOM就很简单了。我们只要对结果集尺寸做适当的预估,然后再相应地增加Driver侧的内存配置就好了。调节Driver端侧内存大小我们要用到spark.driver.memory配置项,预估数据集尺寸可以用“先Cache,再查看执行计划”的方式,示例代码如下。

val df: DataFrame = _
df.cache.count
val plan = df.queryExecution.logical
val estimated: BigInt = spark
.sessionState
.executePlan(plan)
.optimizedPlan
.stats
.sizeInBytes

Executor端的OOM

我们再来说说Executor端的OOM。我们知道,执行内存分为4个区域:Reserved Memory、User Memory、Storage Memory和Execution Memory。这4个区域中都有哪些区域会报OOM异常呢?哪些区域压根就不存在OOM的可能呢?

在Executors中,与任务执行有关的内存区域才存在OOM的隐患。其中,Reserved Memory大小固定为300MB,因为它是硬编码到源码中的,所以不受用户控制。而对于Storage Memory来说,即便数据集不能完全缓存到MemoryStore,Spark也不会抛OOM异常,额外的数据要么落盘(MEMORY_AND_DISK)、要么直接放弃(MEMORY_ONLY)。

因此,当Executors出现OOM的问题,我们可以先把Reserved Memory和Storage Memory排除,然后锁定Execution Memory和User Memory去找毛病。

User Memory的OOM

在内存管理那一讲,我们说过User Memory用于存储用户自定义的数据结构,如数组、列表、字典等。因此,如果这些数据结构的总大小超出了User Memory内存区域的上限,你可能就会看到下表示例中的报错。

java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf
 
java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

如果你的数据结构是用于分布式数据转换,在计算User Memory内存消耗时,你就需要考虑Executor的线程池大小。还记得下面的这个例子吗?

val dict = List(“spark”, “tune”)
val words = spark.sparkContext.textFile(“~/words.csv”)
val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

自定义的列表dict会随着Task分发到所有Executors,因此多个Task中的dict会对User Memory产生重复消耗。如果把dict尺寸记为#size,Executor线程池大小记为#threads,那么dict对User Memory的总消耗就是:#size * #threads。一旦总消耗超出User Memory内存上限,自然就会产生OOM问题。

那么,解决User Memory 端 OOM的思路和Driver端的并无二致,也是先对数据结构的消耗进行预估,然后相应地扩大User Memory的内存配置。不过,相比Driver,User Memory内存上限的影响因素更多,总大小由spark.executor.memory * ( 1 - spark.memory.fraction)计算得到。

Execution Memory的OOM

要说OOM的高发区,非Execution Memory莫属。久行夜路必撞鬼,在分布式任务执行的过程中,Execution Memory首当其冲,因此出错的概率相比其他内存区域更高。关于Execution Memory的OOM,我发现不少同学都存在这么一个误区:只要数据量比执行内存小就不会发生OOM,相反就会有一定的几率触发OOM问题。

实际上,数据量并不是决定OOM与否的关键因素,数据分布与Execution Memory的运行时规划是否匹配才是。这么说可能比较抽象,你还记得黄小乙的如意算盘吗?为了提高老乡们种地的热情和积极性,他制定了个转让协议,所有老乡申请的土地面积介于1/N/2和1/N之间。因此,如果有的老乡贪多求快,买的种子远远超过1/N上限能够容纳的数量,这位老乡多买的那部分种子都会被白白浪费掉。

同样的,我们可以把Execution Memory看作是

土地,把分布式数据集看作是种子,一旦分布式任务的内存请求超出1/N这个上限,Execution Memory就会出现OOM问题。而且,相比其他场景下的OOM问题,Execution Memory的OOM要复杂得多,它不仅仅与内存空间大小、数据分布有关,还与Executor线程池和运行时任务调度有关。

抓住了引起OOM问题最核心的原因,对于Execution Memory OOM的诸多表象,我们就能从容应对了。下面,我们就来看两个平时开发中常见的实例:数据倾斜和数据膨胀。为了方便说明,在这两个实例中,计算节点的硬件配置是一样的,都是2个CPU core,每个core有两个线程,内存大小为1GB,并且spark.executor.cores设置为3,spark.executor.memory设置为900MB。

根据配置项那一讲我们说过的不同内存区域的计算公式,在默认配置下,我们不难算出Execution Memory和Storage Memory内存空间都是180MB。而且,因为我们的例子里没有RDD缓存,所以Execution Memory内存空间上限是360MB。

实例1:数据倾斜

我们先来看第一个数据倾斜的例子。节点在Reduce阶段拉取数据分片,3个Reduce Task对应的数据分片大小分别是100MB和300MB。显然,第三个数据分片存在轻微的数据倾斜。由于Executor线程池大小为3,因此每个Reduce Task最多可获得360MB * 1 / 3 = 120MB的内存空间。Task1、Task2获取到的内存空间足以容纳分片1、分片2,因此可以顺利完成任务。

Task3的数据分片大小远超内存上限,即便Spark在Reduce阶段支持Spill和外排,120MB的内存空间也无法满足300MB数据最基本的计算需要,如PairBuffer和AppendOnlyMap等数据结构的内存消耗,以及数据排序的临时内存消耗等等。

这个例子的表象是数据倾斜导致OOM,但实质上是Task3的内存请求超出1/N上限。因此,针对以这个案例为代表的数据倾斜问题,我们至少有2种调优思路:

  • 消除数据倾斜,让所有的数据分片尺寸都不大于100MB
  • 调整Executor线程池、内存、并行度等相关配置,提高1/N上限到300MB

每一种思路都可以衍生出许多不同的方法,就拿第2种思路来说,要满足1/N的上限,最简单地,我们可以把spark.executor.cores设置成1,也就是Executor线程池只有一个线程“并行”工作。这个时候,每个任务的内存上限都变成了360MB,容纳300MB的数据分片绰绰有余。

当然,线程池大小设置为1是不可取的,刚刚只是为了说明调优的灵活性。延续第二个思路,你需要去平衡多个方面的配置项,在充分利用CPU的前提下解决OOM的问题。比如:

  • 维持并发度、并行度不变,增大执行内存设置,提高1/N上限到300MB
  • 维持并发度、执行内存不变,使用相关配置项来提升并行度将数据打散,让所有的数据分片尺寸都缩小到100MB以内

关于线程池、内存和并行度之间的平衡与设置,我在CPU视角那一讲做过详细的介绍,你可以去回顾一下。至于怎么消除数据倾斜,你可以好好想想,再把你的思路分享出来。

实例2:数据膨胀

我们再来看第二个数据膨胀的例子。节点在Map阶段拉取HDFS数据分片,3个Map Task对应的数据分片大小都是100MB。按照之前的计算,每个Map Task最多可获得120MB的执行内存,不应该出现OOM问题才对。

尴尬的地方在于,磁盘中的数据进了JVM之后会膨胀。在我们的例子中,数据分片加载到JVM Heap之后翻了3倍,原本100MB的数据变成了300MB,因此,OOM就成了一件必然会发生的事情。

在这个案例中,表象是数据膨胀导致OOM,但本质上还是Task2和Task3的内存请求超出1/N上限。因此,针对以这个案例为代表的数据膨胀问题,我们还是有至少2种调优思路:

  • 把数据打散,提高数据分片数量、降低数据粒度,让膨胀之后的数据量降到100MB左右
  • 加大内存配置,结合Executor线程池调整,提高1/N上限到300MB

小结

想要高效解决五花八门的OOM问题,最重要的就是准确定位问题出现的区域,这样我们的调优才能有的放矢,我建议你按照两步进行。

首先,定位OOM发生的代码位置,你通过Stack Trace就能很快得到答案。

其次,定位OOM是发生在Driver端还是在Executor端如果是发生在Executor端,再定位具体发生的区域。

发生在Driver端的OOM可以归结为两类:

  • 创建的数据集超过内存上限
  • 收集的结果集超过内存上限

应对Driver端OOM的常规方法,是先适当预估结果集尺寸,然后再相应增加Driver侧的内存配置。

发生在Executors侧的OOM只和User Memory和Execution Memory区域有关,因为它们都和任务执行有关。其中,User Memory区域OOM的产生的原因和解决办法与Driver别无二致,你可以直接参考。

而Execution Memory区域OOM的产生的原因是数据分布与Execution Memory的运行时规划不匹配,也就是分布式任务的内存请求超出了1/N上限。解决Execution Memory区域OOM问题的思路总的来说可以分为3类:

  • 消除数据倾斜,让所有的数据分片尺寸都小于1/N上限
  • 把数据打散,提高数据分片数量、降低数据粒度,让膨胀之后的数据量降到1/N以下
  • 加大内存配置,结合Executor线程池调整,提高1/N上限

每日一练

  1. 数据膨胀导致OOM的例子中,为什么Task1能获取到300MB的内存空间?(提示:可以回顾CPU视角那一讲去寻找答案。)
  2. 在日常开发中,你还遇到过哪些OOM表象?你能把它们归纳到我们今天讲的分类中吗?

期待在留言区看到你的思考和分享,我们下一讲见!

精选留言

  • wow_xiaodi

    2021-08-07 16:21:36

    老师,从第一讲看到这里,貌似有个东西还没介绍,假设并行度、executor core和每个core均摊的execution memory都估算好了,那么我要几个executor,每个executor几个core好呢,就是说我executor多,然后每个executor的core少点,还是反过来好呢,是否有经验之谈?
    作者回复

    好问题,思考的很细致~ 一般来说,在这种情况,我们倾向于“广撒网”,就是Executors个数多一些,每个Executors的资源相对小一些,比较轻量。

    这么做的目的,主要是保证分布式计算的鲁棒性与扩展性,轻量Executors失败、重建的开销,要小于重量级的Executors。尤其是在开启了Dynamic Allocation的情况下,更是如此。

    2021-08-13 13:24:54

  • 苏子浩

    2021-04-27 01:04:01

    老师好,我有一下三个问题:
    (i)关于Task获取内存方面中“每个线程分到的可用内存有一定的上下限,下限是 M/N/2,上限是 M/N,也就是属于[M/(2*N), M/N].”,同时注意到上述的M和N其实是随着Executor中task的状态动态变化的,根据前文提到的“执行内存总量M动态变化,由于Execution Memory可以占用Storage Memory以及抢占的优先级,所以ExecutionMemory的下限是 Execution Memory初始值,上限是 spark.executor.memory * spark.memory.fraction”和“N~是Executor 内当前的并发度”。但是在具体的例子中怎么task的内存分配我不是很理解。比如本文中提到的“实例 1:数据倾斜”所提到的“Executor 线程池大小为 3,因此每个 Reduce Task 最多可获得 360MB * 1 / 3 = 120MB 的内存空间。Task1、Task2 获取到的内存空间足以容纳分片 1、分片 2,因此可以顺利完成任务。”我不理解的是,此时的Task是以“一批”的形式同时进入Executor吗?所以是“360MB/3=120MB”。为什么不是Task1‘到的早’,它刚来的时候N=1,所以它最多可以拿到整个执行内存,即360MB呢?但是Task1实际只需要100MB,所以分100MB给Task1。此时可用的‘动态执行内存总量’变成260MB(如果是这样,那么接着的内存构成是:(1)80MBStorage Memory + 180Execution Memory还是(2)180MBStorage Memory + 80MBExecution Memory?i.e.其实我想问的是在内存分配上的优先级是怎么分配?先去‘贪心’地抢先占用Storage Memory等用完以后再使用Execution Memory还是分配Execution Memory再开始用Execution Memory?根据您的黄小己招租种地的规则应该是先去占用自己所属的部分的内存吧?)
    (ii)同时,实例二:数据膨胀例子中,“task1之所以能拿到300MB,是因为它“到的早”,它刚来的时候N=1,所以它最多可以拿到整个执行内存。“那么task1实际是拿到了多少内存呢?是300MB还是360MB?是按需分配还是给360MB?如果是分配了300MB,那么此时“动态执行内存总量”变成了多少呢,是60MB吗?那么此时Task2进入时,假设Task3还没有进入,N等于2了,所以Task2分配到的执行是60 / 2 = 30MB吗?
    (iii)Executor中的Task是同时进入的吗?我的意思是Driver是否会一次性生成所有的的Task,并将Task全部都发送到Executor去执行?还是Driver不完全发送所有Task,根据Executor的并发度(基本上取决于Executor的cores个数)去发送,按照Executor的执行情况去发送Task,执行完一个Task再发送一个Task?虽然之前有提到其实Task本身是自带任务调度意愿的。
    打了很多字,确实自己没有想明白,麻烦老师了,不好意思!谢谢!
    作者回复

    都是很好的问题~ 没事,不用怕字数多,像这样的讨论多多益善~ 咱们一个一个来说。

    i)先说执行内存的上限,你也说了,是Execution Memory、外加Storage Memory空闲的部分。再说内存抢占,这两部分内存的消耗,一定是优先消耗Execution Memory,然后再去抢占Storage Memory,而且,抢占的时候,还只能抢Storage Memory空闲的部分(Storage Memory的空间也是动态变化的,比如中间有缓存任务往Storage Memory里面灌数据)。这块其实可以想想黄小乙的占地协议,他必须先把自己的地种满,然后才能去抢人家的地,不能说,一上来,就把人家麻子的地给占了,那也不公平嘛!再者,关于任务的内存消耗,思考的很深入,挺好~ 我们要区分两个概念,一个,是Task可以获取到的内存上限、也就是1/N;一个,是Task计算对于内存的消耗需要。换句话说,一个是供给上限,一个是计算需求。内存的分配,是按照需求来分配的,也就是说,你需要多少,就逐步给你分配多少。并不是说,你的上限是120MB,Memory Manager一下子就把这120MB全部划给你、预留给你了。1/N只是说明,你最多可以拿这么多,但是你具体消耗多少,取决于你手里有多少农作物的种子。这就好比,你手里就1亩地的种子,却想从黄小乙那里租赁10亩地,那黄小乙也不能同意呀!剩下的9亩地,不是白白浪费掉了么。

    ii)好像已经把问题ii)回答了,哈哈,就是你说的那样,消耗300MB,还剩60MB。如果Task2到了、Task3还没到,就是你算的结果60MB/2 = 30MB。

    iii)这个确实是个好问题,不过这个问题也暴露了你调度系统那一讲没吃透 😃,这部分其实要结合调度系统去理解。还记得DAGScheduler、TaskScheduler和SchedulerBackend吗?Executors的空闲状态,是由SchedulerBackend维护的,TaskScheduler拿到DAGScheduler拆解的Tasks之后,开始去SchedulerBackend那里尝试拿ResourceOffer,然后再从Tasks里面去挑,挑那些满足本地性级别要求的任务,然后再调度过去。你可以把DAGScheduler理解为任务执行的需求端,把SchedulerBackend理解成计算资源的供给端,而TaskScheduler就是中间的媒人、中介、撮合商。回答你的问题,首先,不是Driver一股脑把Tasks都分发过去,因为Tasks的执行需求,要先看SchedulerBackend有没有足够的资源供给。实际上,这个流程恰恰相反,是SchedulerBackend先提供ResourceOffer,然后TaskScheduler才能去挑Tasks,然后把任务调度过去。所以实际的调度情况,和你后者说的差不多,也就是你说的:“按照Executor的执行情况去发送Task”。任务调度确实比较难,除了前面的第5讲《调度系统》,还可以看看infoQ的这篇,这篇《权力的游戏》更详尽:https://www.infoq.cn/article/5aOHzQIaXX6NlHriLtSI

    2021-04-29 14:29:55

  • 王天雨

    2021-04-21 15:59:30

    1、执行内存总量是动态变化的,最大是spark.executor.memory * spark.memory.fraction
    本例中最大360M。
    其次并发度N是固定不变,但是Executor中当前并行执行的任务数是小于等于N的,
    上下限公式的计算是根据Executor中当前并行执行的并发度来计算的。
    因此先拿到任务的线程能够申请更多的资源,极端情况下,本例单个Task可享受360M内存。
    作者回复

    正解~ 赞~ 👍 老弟对于执行内存的(1/N/2,1/N)理解得相当到位~

    2021-04-21 23:55:28

  • sky_sql

    2021-04-21 20:09:15

    老师好!麻烦问下 Shuffle 文件寻址有个参数spark.reducer.maxSizeInFlight 默认48m,这个buffer缓冲每次拉取48m数据。是Execution Memory剩余部分不够48m就会oom吗?这个和1/N的有啥关联?
    作者回复

    好问题,你说的是对的,这部分buffer也算作Execution Memory的一部分,也会记到Execution Memory的“账上”。因此,如果像你说的这种edge case,连48M都没有了,那拉数据的时候确实会OOM。这部分就没有1/N的关系了哈,每个task都是这么大的buffer,一旦不够,也就OOM了。

    2021-04-22 19:06:55

  • 在路上

    2021-12-03 19:42:50

    老师好,请教下关于driver端oom问题,生产中遇到过这样问题,数据表数据量大,每次扫描近一年分区,几十个表关联场景,每次任务启动都一直初始化,有的时候还超时失败,任务一只run不起来,我记得没修改逻辑的时候是把driver调到100多g解决的。后续把代码扫描分区缩短了没有了,想问一下这种情况driver在计算分片吗?为啥这么久啊。
    作者回复

    老弟说的生产环境,我理解是物理上的分布式集群哈。如果是真正的分布式集群,那肯定就不是Driver在读数据。坦白地说,你说的这种情况,我觉得挺蹊跷的,把Driver端调大,就能解决问题,但是,把需要扫描的数据量缩小,问题也不见了。所以听上去,是数据量太大,可问题是,Driver并不需要扫描数据,需要扫描的数据量多少,也不会给Driver增减负担,所以这个就很奇怪了。结合你的描述:“每次任务启动都一直初始化,有的时候还超时失败,任务一只run不起来”,倒是有一种可能,就是DAG过于复杂(几十个表关联),导致任务调度失败(比如Task过大,Task分发超时,等等),不过这个需要老弟确认,你说的超时,具体是什么超时?Task分发超时,还是心跳连接超时?有了这些信息,更好判断一些~

    2021-12-04 17:56:25

  • 冯杰

    2021-10-08 18:27:18

    老师好,文章中提到:“Task3 的数据分片大小远超内存上限,即便 Spark 在 Reduce 阶段支持 Spill 和外排,120MB 的内存空间也无法满足 300MB 数据最基本的计算需要,如 PairBuffer 和 AppendOnlyMap 等数据结构的内存消耗,以及数据排序的临时内存消耗等等。”

    关于上述这段话有点疑问:
    1、shuffle read 阶段,reduce task去fetch数据时,是可以支持spill到磁盘的。 但在实际工作中,经常出现fetch fail的异常,增加内存后或者同等内存下换为堆外内存也可以解决问题;
    2、为什么支持spill操作,还会导致OOM呢? 看老师的解答是需要一个最基础的内存需求,比如:300MB的数据需要120M+50M内存,这块儿不是很明白
    作者回复

    好问题~

    其实这两个问题,都可以归结为一个问题,就是JVM堆内内存的使用与占用预估。其实课程中咱们提到过,就是Spark对于JVM堆内内存的估计是不够准确的,这主要是为了牺牲一定的精度,而照顾效率上的考虑。

    而堆外内存的计算,就不存在这样的隐患,堆外内存的计算,是非常精确的。因此,JVM堆内内存的OOM,相比堆外会更加的频发。

    Task之间的内存争抢,在“逻辑上”会尊重(M/N/2, M/N)的限制,但实际在JVM里面,不同Task所占用的内存,是没有明确边界的。再加上Spark对于内存预估的不准确,OOM也就在所难免。

    Spill,仅仅是在Spark的预估值明确表示内存不足的时候,才会触发。而很多时候,OOM的根源在于,“看上去”内存是够的,但真要用的时候,发现不够。

    除了预估不准以外,一些内存数据结构的扩展,也会触发OOM。比方说,AppendOnlyMap、PartitionedPairBuffer,他们在spill之前,会有一次扩容,在内存中的扩容,目的就是为了承载更多的用户数据。如果扩容的过程中,发现内存不足,那就只能OOM了。因为这里没有Spill的逻辑,纯粹的内存扩容。

    说得不够系统,头上一句、腚上一句,希望对老弟有所帮助~

    2021-10-15 14:01:21

  • 快跑

    2021-04-21 14:59:25

    老师好!
    实例1中
    1、为什么提到“线程池大小设置为 1 是不可取的”?

    2、假如spark.executor.cores设置成 1 ,有3个Task,串行。
    第一个Task执行完成后,360M的内存会全部都释放么?会不会有垃圾还没有回收的情况,导致Task2的内存没有360M可以
    作者回复

    1. 如果并发度是1,单个Executor就是纯粹的串行计算了,Spark“分布式”计算引擎的并行计算就失去了意义。即便还是可以有多个Executors,但是如果每个Executor在同一时间只有一个线程在工作,这种“分布式”计算的效率也是极其低下的。

    2. 好问题,确实有这个隐患。不过呢,因为任务执行涉及的对象都是short-lived,也就是生命周期都很短,不像Storage Memory存储的缓存对象、生命周期很长。因此,执行任务涉及到的对象,多是放在年轻代,Minor GC的触发会频繁一些,回收效率也更高。不像long-lived objects,往往需要触发Full GC才能回收。因此,即便有还未回收的内存,也不影响任务执行,因为Minor GC频繁、且轻量,能够相对及时地把内存回收。

    2021-04-21 23:37:34

  • 苏子浩

    2021-04-27 01:22:24

    老师,我想问一下,数据膨胀导致 OOM 的例子中,一定会出现OOM吗?既然Task1 能获取到 300MB 的内存空间,那么挂起Task2线程和Task3线程,等待Task1内存释放,然后依次完成3个Task呢?
    作者回复

    你说的对,如果Spark可以保证依次运行这3个Tasks,确实不会OOM。不过,这里的关键是“依次”,当Executors线程池大于1的时候,Spark不能保证“依次”处理task。Task确实有先来后到,但是要是严格保证“依次”可就难了。

    把spark.executor.cores置为1,就是咱们这讲举的极端的例子,Spark可以严格保证“依次”,不过就像咱们说的,它其实已经失去并行计算的意义了。

    2021-04-27 16:49:05

  • Unknown element

    2022-01-04 15:18:30

    老师您好,我有几个问题:
    (1)文章中举的例子,“每个 core 有两个线程”是怎么设置的?spark.task.cpus=0.5吗?
    (2)我看官方文档对于spark.executor.cores的定义是The number of cores to use on each executor. 现在spark.executor.cores=3但是一个机器上只有两个core,那这时候创建executor的资源好像不够?另外您说spark.executor.cores=1就失去并行的意义了,但是我们目前spark.executor.cores就设置的是1(捂脸)运维说这是经过慎重考虑的默认参数,在 https://mp.weixin.qq.com/s/gNxQKTH9JkNsDaBKttvAsQ 这篇文章的 06 规范优化 => 03 参数滥用问题 有提到,不知道老师对这个设置怎么看呢?
    (3)您对 To_Drill 和 狗哭 两位同学的问题回答感觉有一点矛盾呢~在 狗哭 的问题的回答中您说 “task申请不到额外内存,不得不进入waiting list,等待别的task把内存释放,这个时候,CPU也会挂起”,也就是说task开始计算之后发现内存不够但是又申请不到额外内存就会被挂起,但是在 To_Drill 的问题的回答中您说 “task为了容纳整个数据分片,需要不停地申请内存,如果内存不够,任务不能再挂起了,因为挂起来,内存也不能释放,别的task也进不来,挂起没有意义,所以只能硬着头皮往下执行,直到把内存撑爆为止”,意思应该是task开始计算之后发现内存不够但是又申请不到额外内存这时就直接抛oom?不知道是不是我没理解对...
    谢谢老师!!
    作者回复

    好问题,老弟看的非常仔细、认真,赞一个先👍,咱们一个个说
    1)是的,如果一个core,要跑两个task,就设置0.5,不过这个需要每个core切实可以超线程到至少两个线程,物理上才可行
    2)先说结论,spark.executor.cores设置为1,是没问题的,只要Executors数量足够多,也可以充分利用硬件资源,做到并行计算。我又回顾了下原文,原文的说法,确实有问题,漏了一句话。就是在咱们OOM数据倾斜的例子里,本意是说,在一个物理机的那一个Executor中,把spark.executor.cores设置为1,那么那个Executor就从并行计算,退化到串行计算,因为只有一个cpu嘛。但是,实际上一个物理机,可以起多个Executors,每个Executors只有一个cpu,也是OK的。只不过我们这里,为了说明OOM的root cause,把场景和问题都简化了,就假设一个物理机,只起一个Executor,串行起来的话,就不会有倾斜导致的OOM问题。总结来说,在实际工业部署中,一个集群,多个Executors,每一个Executors一个CPU core,是完全OK的。
    3)我看了下,这里确实有矛盾的地方,可能当时回答“狗哭”同学的时候,比较草率了。这里double confirm下,给“To_Drill”的答复是正确的哈,参考后者的答复即可~ 挂起,只存在于task启动前(拿不到N/2),一旦拿到N/2内存,task即开始执行,执行过程中,如果内存申请持续得不到满足,就抛OOM

    2022-01-09 17:52:33

  • Fendora范东_

    2021-04-21 12:57:36

    1.task1首先运行的,拿到自己1/N发现还不够,就继续申请内存。task2/task3后面运行,发现可用内存满足下限,就跑去运行了,结果task1抢先申请到300M,task2,task3在运行时需要更多内存,不能得到满足,导致OOM。
    2.driver端oom:
    遇到过执行查询SQL,结果集太大,oom。通过调maxResultSize大小来解决。
    executor端oom:
    之前没细究过,oom了就给executor调大内存就完了。。。。😅
    作者回复

    1. 不完全对,task1之所以能拿到,是因为它“到的早”,它刚来的时候N=1,所以它最多可以拿到整个执行内存。task2、task3不行,是因为他们来得晚,他们到的时候,N至少等于2了,所以“每人”最多拿执行内存的一半,所以肯定会OOM。这里其实有点tricky,主要想考大家对于(1/N/2,1/N)的理解是不是足够透彻。

    2. 对,driver OOM调整maxResultSize比较典型。学过这一讲,Executors内存调优可以做得更细致一些了,就不必每次都是单纯调大executor.memory~

    2021-04-21 23:53:49

  • To_Drill

    2021-11-23 18:52:38

    老师,对下边这个描述我有个疑问,既然内存是陆续分配的,申请不到内存时会挂起,那些OOM的case为啥没有挂起呢?
    2)再一个,就是随着task的进展,task对于内存的持续申请,得不到满足,注意,是持续申请,Spark根据task的计算需要,陆续给task分配内存,并不是一下子就提前allocate一定量的内存,这个是很多同学困惑的地方~ 就是大家一开始可能申请的都不多,但是随着各自task计算的进展,大家申请的内存量会陆续增大,慢慢的,就会出现有些task申请不到额外内存,不得不进入waiting list,等待别的task把内存释放,这个时候,CPU也会挂起,造成CPU资源的浪费
    作者回复

    好问题,是申请不到M/N/2,会被挂起,一旦这个初始值达到了,task就会唤醒执行。

    执行过程中,根据任务和数据量需要,会不停地申请内存,当然,中间可以利用spill机制,来做外排。但是,像AppendOnlyMap,PartitionPairBuffer这种内存数据结构,在内存有需要时,他们会翻倍地扩张;还有一些任务,比如Hash Join的build阶段,hash table是需要在内存中构建的。

    这样的话,task为了容纳整个数据分片,需要不停地申请内存,如果内存不够,任务不能再挂起了,因为挂起来,内存也不能释放,别的task也进不来,挂起没有意义,所以只能硬着头皮往下执行,直到把内存撑爆为止

    2021-11-28 11:28:48

  • 2021-11-11 23:25:47

    老师好,关于苏子浩的提问的。。。。。(ii)同时,实例二:数据膨胀例子中,“task1之所以能拿到300MB,是因为它“到的早”,它刚来的时候N=1,所以它最多可以拿到整个执行内存。“那么task1实际是拿到了多少内存呢?是300MB还是360MB?是按需分配还是给360MB?如果是分配了300MB,那么此时“动态执行内存总量”变成了多少呢,是60MB吗?那么此时Task2进入时,假设Task3还没有进入,N等于2了,所以Task2分配到的执行是60 / 2 = 30MB吗?
    我的理解是这样的,,,tast1 分走了 300M,tast2 进来了,此时N=2,我觉得应该是 task1 和task2 来分一共的 360M (两个任务最大的上限(360/2)180M),此时task2只需要100M,那么task2可以运行,task1 应该只能分到360-100等于260M,但是因为task1 本身需要 300,所以oom,,,, 不理解 为什么task2是分到60/2=30M
    作者回复

    老弟说的对,这里60 / 2 = 30MB是不对的,应该是(360/2)= 180M,就是说,在算每个Task能够分配的内存时,分子应该是“动态执行内存总量”,而不是当前的Free Memory,这里需要更正下,感谢老弟提醒~

    接下来,咱们来分析一下这个过程。Task2进来之后,每个Task,在理论上,最少拿到90M,最多拿到180M。但是这个时候,Task1已经占据了300M,Free Memory还剩60M。

    在这种情况下,Task2会被挂起,等待Task1释放内存,直到Free Memory超过90M,这个时候Task2恢复执行,但是,Task2需要300M的内存(不是你说的100M哈,要考虑膨胀系数),随着Task2的推进执行,需要消耗的内存越来越多,但是系统提供不了Task2所需的内存消耗,因而最终会OOM~

    2021-11-12 15:48:33

  • RespectM

    2021-05-30 16:16:03

    老师数据膨胀如何监控,怎么看是否是数据膨胀导致的oom,还是shuffle的时候netty导致的堆外内存oom?
    作者回复

    首先,这两种OOM的报错信息不同哈,前者一般是Not enough heap memory,也就是堆内空间不足;而后者一般会报overhead不够,所以仅从报错信息,基本就能分辨两者的区别。

    再者,对于数据膨胀的监控,可以考虑利用Spark UI的一些常规指标,比如说Stage详情页面的Shuffle spill (memory)和Shuffle spill (disk),它们标记了同一份数据在内存与磁盘中的大小,通过观察两者的差异,可以量化对比数据在内存中的膨胀系数,也就是:
    Memory Expansion Rate ≈ Shuffle spill (memory) / Shuffle spill (disk)。

    2021-05-30 23:49:20

  • 苏子浩

    2021-04-27 10:58:06

    老师,您好。关于文中在“case1: 数据倾斜”部分提到的“针对以这个案例为代表的数据倾斜问题,我们至少有 2 种调优思路:1. 消除数据倾斜,让所有的数据分片尺寸都不大于 100MB;2. 调整 Executor 线程池、内存、并行度等相关配置,提高 1/N 上限到 300MB”。我有两点疑问:
    (1)这里提到的数据分片尺寸100MB是怎么定的呢?为什么不是120MB呢?本例子中Executor 线程池大小为 3,每个 Reduce Task 最多可获得 360MB / 3 = 120MB 的内存空间。以及关于留言关于“spark.reducer.maxSizeInFlight“的回复:”这部分buffer也算作Execution Memory的一部分,也会记到Execution Memory的“账上”。“那么保证数据分片尺寸低于120MB或者100MB此时有效吗?

    (2)调整 Executor 线程池、内存、并行度等相关配置,提高“ 1/N ”上限到 300MB,这里为什么是“1/N”而不是14讲中的“动态实时变化的 Execution Memory / N~”呢?文中所举的例子“维持并发度、并行度不变,增大执行内存设置,提高 1/N 上限到 300MB”,这里我的理解变化的是Execution Memory ,和“1/N”这个本身有关系吗?
    谢谢老师!
    作者回复

    好问题,我们一个一个来说。

    1) 你说的没错,线程池大小为3的话,确实每个Task最多可以拿到120MB的内存。咱们原文里面建议分片大小100MB,目的其实就是留点富余,不要满打满地真的把分片大小弄到和上限一样大。

    留富余的目的,其实就是为了任务执行的稳定性和可靠性。首先,这里面有你说的因素,也就是其他环节也都需要内存;再者,即便其他环节不消耗内存,我们也得留下富余。原因是,对于任务调度来说,作为开发者,我们不能直接掌控,任务总有先来后到,在N~ < N的时候,我们用N=3去计算每个Task分得的内存大小,就会有偏差,这个咱们原文也有讲。所以,我们在考虑每个分片数据量大小的时候,要尽量留些富余出来。

    2)我们之所以用N去计算、去预估,有两个原因。第一个很简单,N~是不确定的,我们只知道N~ < N,但不知道N~具体是多少。再者,虽然由于调度系统的缘故,Task有先来后到,在任务调度初期N~确实小于N,但是随着执行的进行,N~总会追上来、等于N,而且这个过程其实很快。也就是在大部分时间,N~都是等于N的。另外,在你说的case里,只有增大Execution Memory,相应的1/N也才能跟着变大呀 😃

    2021-04-29 14:05:15

  • Fendora范东_

    2021-04-21 12:50:22

    有个疑问
    collect是把executor所有数据全部拉回来,但是他是一次性全放到driver端吗?如果是,那fetchsize的大小只是为了分批输出吗,是不是调的越大输出就越快了;如果不是,那就不应该出现driver端oom吧
    按照文中分析来看,collect就是把所有数据一次性拉到driver端了吧?
    作者回复

    collect是全量拉,你说的fetch size,我理解是单批次拉取的最大数据量,是用来限制网络吞吐的,因此这个参数和Driver是否OOM没有关系。Driver能承受的全量数据大小,由maxResultSize参数来决定,如果你拉的全量数据,大于maxResultSize,就会OOM。

    2021-04-21 23:59:33

  • 骨汤鸡蛋面

    2022-05-05 17:32:42

    老师,我在公司做AI云平台相关工作,AI云平台的工作流里面有很多spark 任务,这个任务可能跑一天的数据,也可能跑一个月的,但是算法工程师不会调节 spark 参数,就导致 任务的运行时间不可控,所以,我们想做一个 配置建议的工具,告诉算法人员这次提交任务,cpu 和 mem 最好配多大。请问下,spark 有人开展类似工作嘛?
  • 毕务刚

    2021-11-25 16:17:38

    老师好:

    将HDFS的json文件导入到hive表时,出现java.lang.OutOfMemoryError: GC overhead limit exceeded,
    并且在log中,可以看到WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory

    代码很简单
    spark.read.json("**").write.mode("overwrite").saveAsTable("**")

    这个问题是哪部分内存出现了问题呢?
    如果不增加内存, 还可以如何优化?
    作者回复

    消耗的是Execution Memory,从报错上看,可以把先把Executors内存调大,看上去现在只有1GB,比较小。

    另外,问下老弟JSON文件平均多大?或者说,JSON文件的分片多大?分片最好不要超过1GB,128M、256M为宜。另外,还有个问题,每个JSON字符串大概多大?因为JVM里面,是需要给每个JSON串构建对象的,如果JSON串本身很大,也会有OOM隐患。

    所以回答老弟的问题,如果不动内存,那就只好对源文件下手了,把数据分片打散一些,不过还是那句话,如果一个JSON结构本身就很大,这个就比较麻烦,毕竟对象的构建,是以一个JSON结构为粒度的

    2021-11-28 10:22:10

  • 狗哭

    2021-11-04 09:41:16

    老师您好,接着上面苏子浩的第二问的回答我有点不太明白了。前面cpu那一讲线程挂起这个问题里面有这样一段:M 的下限是 Execution Memory 初始值,上限是 spark.executor.memory * spark.memory.fraction 划定的所有内存区域。那这样的话task2进来时给它的内存不应该是(360/2/2,360/2)吗?如果是60/2的话,那线程挂起只有一种情况了,就是这个线程一点内存也没抢占到,也不存在因为分配的内存大于0小于M/N/2而挂起这种情况了吧。请老师指点
    作者回复

    可能原文我没说清楚哈~ CPU挂起有两种情况:
    1)一个就是底线不满足:也就是连M/N/2的内存都申请不到~
    2)再一个,就是随着task的进展,task对于内存的持续申请,得不到满足,注意,是持续申请,Spark根据task的计算需要,陆续给task分配内存,并不是一下子就提前allocate一定量的内存,这个是很多同学困惑的地方~ 就是大家一开始可能申请的都不多,但是随着各自task计算的进展,大家申请的内存量会陆续增大,慢慢的,就会出现有些task申请不到额外内存,不得不进入waiting list,等待别的task把内存释放,这个时候,CPU也会挂起,造成CPU资源的浪费

    2021-11-05 16:38:44

  • Unknown element

    2021-10-25 09:57:55

    老师 问下数据倾斜的第二种解决方法“维持并发度、执行内存不变,使用相关配置项来提升并行度将数据打散,让所有的数据分片尺寸都缩小到 100MB 以内”感觉不能解决本质问题吧,发生数据倾斜应该是因为某个分区数据量太大了,但是分区不管打的多散,这些数据在reduce阶段都会汇集到一个节点吧?
    作者回复

    是的,你说的没错,这种做法,本质上会缓解因为并行度设置问题而引入的倾斜问题(比如并行度过低,Hash过后大部分数据被划分到少量分区之上)。对于那些少数id带来的绝对倾斜问题,纯靠调整并行度是没用的~

    这个时候,要么用AQE的自动倾斜处理,要么手工加盐,消除倾斜。

    2021-10-26 23:09:44

  • Sean

    2021-08-29 16:08:09

    磊哥好,磊哥上文提到的案例中采用默认配置,源码里面是这样描述的:
    "The region shared between execution and storage is a fraction of (the total heap space - 300MB) configurable through `spark.memory.fraction` (default 0.6). The position of the boundary within this space is further determined by `spark.memory.storageFraction` (default 0.5). This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default"
    1.总的堆内存没有提到user memory,只有storage + execution + reserved
    2.默认配置fraction:0.5,storagefraction:0.6,storage=(900-300)*0.3=180m,execution=(900-300)*0.6*(1-0.5)=180m
    3.没有缓存这里理解为没有使用到storage memory缓存RDD,广播变量等数据,为0M,则execution可以额外占用180M,xecution = 360M
    4.反推900M= reserved300m+storage180M+execution180M+240M,可以理解为240M是预留的给user memory的内存吗?
    作者回复

    就按照文中给出的公式计算就好了,user memory一定是有的~ JVM堆内就分成了4部分,Reserved、User、Execution、Storage。你可以再翻翻其他源码~

    2021-09-06 22:54:20