14 | CPU视角:如何高效地利用CPU?

你好,我是吴磊。

在日常的开发与调优工作中,总有同学向我抱怨:“为什么我的应用CPU利用率这么低?偌大的集群,CPU利用率才10%!”确实,较低的CPU利用率不仅对宝贵的硬件资源来说是一种非常大的浪费,也会让应用端到端的执行性能很难达到令人满意的效果。那么,在分布式应用开发中,我们到底该如何高效地利用CPU?

我们说过,性能调优的最终目的,是在所有参与计算的硬件资源之间寻求协同与平衡,让硬件资源达到一种平衡、无瓶颈的状态。对于CPU来说,最需要协同和平衡的硬件资源非内存莫属。原因主要有两方面:一方面,在处理延迟方面,只有内存能望其项背;另一方面,在主板上内存通过数据总线直接向CPU寄存器供给数据。因此,理顺它们之间的关系,可以为性能调优奠定更好的基础。

那么,今天这一讲,我们就从硬件资源平衡的角度入手,去分析CPU与内存到底该如何合作。

CPU与内存的平衡本质上是什么?

我们知道,Spark将内存分成了Execution Memory和Storage Memory两类,分别用于分布式任务执行和RDD缓存。其中,RDD缓存虽然最终占用的是Storage Memory,但在RDD展开(Unroll)之前,计算任务消耗的还是Execution Memory。因此,Spark中CPU与内存的平衡,其实就是CPU与执行内存之间的协同与配比。

要想平衡CPU与执行内存之间的协同和配比,我们需要使用3类配置参数,它们分别控制着并行度、执行内存大小和集群的并行计算能力。只有它们设置得当,CPU和执行内存才能同时得到充分利用。否则CPU与执行内存之间的平衡就会被打破,要么CPU工作不饱和,要么OOM内存溢出。

想要知道这3类参数都包含哪些具体的配置项,以及它们到底是怎么配置的,我们需要先弄清楚一些基础知识,也就是并行计算的线程之间是如何瓜分执行内存的。为了帮助你理解,我先来给你讲个故事。

黄小乙的如意算盘:并行计算的线程如何瓜分执行内存?

还记得地主招租的故事吗?与张麻子签订占地协议之后,黄小乙就开始盘算,自己分得的那块田地怎么经营才最划算。

他心想:“这么一大块地,我亲自种肯定划不来。一,我没有张麻子勤快;二,不管是种棉花还是咖啡都很耗时、费力,面朝黄土背朝天,我可耽误不起那功夫!不如,我把土地转让出去,让别人来种,我只管收购、销售,赚到的差价也够我吃穿不愁了!”

于是,他打定主意,贴出了一张告示。

告示贴出去不到三天,十里八村的人都赶来承让土地,他们大部分都是吃苦耐劳的庄稼汉,一心想凭借这次机会改善生活,所以每个人恨不能把500顷的田地全都承让过来。

黄小乙见状,心中大喜,认为不仅自己的土地很快就可以被种满,还能名正言顺地去抢占张麻子的那块地。不过,也不能光图规模,还要确保棉花、咖啡的产出质量,更重要的是得想个办法让这种运作模式可持续。

于是,黄小乙追加了一项补充条款:“鉴于老乡们参与热情高涨,公平起见,我又制定了新的土地转让规则:首先,每位老乡能够获得的土地面积有上下限,它的具体数值由可耕种土地总面积和申请人数共同决定;其次,土地转让权的有效时间与农作物生长周期一致,一旦作物丰收,承让人需让出土地,如有意愿再次耕种需重新申请。

比如说,现阶段可耕种土地总面积已由500顷扩张为800顷(这是黄小乙就抢占了张麻子的地之后的土地总面积),如果有400位老乡申请土地转让权,那么每位老乡最高可得2顷(800/400)的土地,最低可得1顷(800/400/2)土地。也就是说,如果老乡人数为N,那么每位老乡能够获得的土地面积会在(1/N/2,1/N)之间浮动。

这个规定大伙儿都心服口服,没过多久,800顷土地就全部转让完了。一笔多赢的买卖让大伙都能各取所需,也让老谋深算的黄四郎都不禁心挑大指,感叹道“真是长江水后浪催前浪,一代新人换旧人!”

好啦,故事到这里暂时告一段落,但是黄小乙这份如意算盘和今天要讲的内容有什么关系呢?

我们讲过,黄小乙租赁的土地类比的是内存区域中的Execution Memory。在今天的故事里,黄小乙招募的棉农和咖啡农对应的就是,Executor线程池中一个又一个执行分布式任务的线程。土地出让规则对应的就是,任务并发过程中多个线程抢占内存资源时需要遵循的基本逻辑。

那么,执行内存抢占规则就是,在同一个Executor中,当有多个(记为N)线程尝试抢占执行内存时,需要遵循2条基本原则:

  • 执行内存总大小(记为M)为两部分之和,一部分是Execution Memory初始大小,另一部分是Storage Memory剩余空间
  • 每个线程分到的可用内存有一定的上下限,下限是M/N/2,上限是M/N,也就是均值

三足鼎立:并行度、并发度与执行内存

理清了线程与执行内存的关系之后,我们再来说说与并发度、执行内存和并行度这三者对应的3类配置项分别是什么,以及它们如何影响CPU与计算内存之间的平衡。

3类配置项

我们讲过,并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。

并行度可以通过两个参数来设置,分别是spark.default.parallelism和spark.sql.shuffle.partitions。前者用于设置RDD的默认并行度,后者在Spark SQL开发框架下,指定了Shuffle Reduce阶段默认的并行度。

那什么是并发度呢?我们在配置项那一讲提到过,Executor的线程池大小由参数spark.executor.cores决定,每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定。两者相除得到的商就是并发度,也就是同一时间内,一个Executor内部可以同时运行的最大任务数量。又因为,spark.task.cpus默认数值为1,并且通常不需要调整,所以,并发度基本由spark.executor.cores参数敲定

就Executor的线程池来说,尽管线程本身可以复用,但每个线程在同一时间只能计算一个任务,每个任务负责处理一个数据分片。因此,在运行时,线程、任务与分区是一一对应的关系

分布式任务由Driver分发到Executor后,Executor将Task封装为TaskRunner,然后将其交给可回收缓存线程池(newCachedThreadPool)。线程池中的线程领取到TaskRunner之后,向Execution Memory申请内存,然后开始执行任务。

如果我们把棉农、咖啡农类比CPU线程,那么TaskRunner就可以理解为锄具,Task要处理的数据分片可以理解为作物种子。有了锄具和种子之后,老乡们得去黄小乙那儿申请块地,才能开始耕种。

最后,我们再来说说执行内存。黄小乙的地就是执行内存,堆内执行内存的初始值由很多参数共同决定,具体的计算公式是:spark.executor.memory * spark.memory.fraction * (1 - spark.memory.storageFraction)。相比之下,堆外执行内存的计算稍微简单一些:spark.memory.offHeap.size * (1 - spark.memory.storageFraction)。

除此之外,在统一内存管理模式下,在Storage Memory没有被RDD缓存占满的情况下,执行任务可以动态地抢占Storage Memory。因此,在计算可用于分配给执行任务的内存总量时,还要把有希望抢占过来的这部分内存空间考虑进来。这也是为什么黄小乙的可耕种土地总面积,会从最开始的500顷逐渐扩展到800顷。

由此可见,可分配的执行内存总量会随着缓存任务和执行任务的此消彼长,而动态变化。但无论怎么变,可用的执行内存总量,都不会低于配置项设定的初始值

好啦,搞明白并行度、并发度和执行内存的概念,以及各自的配置项之后,我们再通过两个经常影响CPU利用率的例子,来说说它们是怎么影响CPU与计算内存之间的平衡的,由此总结出提升CPU利用率的办法。

CPU低效原因之一:线程挂起

在给定执行内存总量M和线程总数N的情况下,为了保证每个线程都有机会拿到适量的内存去处理数据,Spark用HashMap数据结构,以(Key,Value)的方式来记录每个线程消耗的内存大小,并确保所有的Value值都不超过M/N。在一些极端情况下,有些线程申请不到所需的内存空间,能拿到的内存合计还不到M/N/2。这个时候,Spark就会把线程挂起,直到其他线程释放了足够的内存空间为止。

你可能会问:“既然每个线程能拿到的内存上限是M/N,也就是内存总量对线程数取平均值,为什么还会出现有的线程连M/N/2都拿不到呢?这在数学上也不成立呀!”这是个好问题。这种情况的出现,源于3方面的变化和作用:

  • 动态变化的执行内存总量M
  • 动态变化的并发度N~
  • 分布式数据集的数据分布

首先,动态变化的执行内存总量M我们刚刚已经说过了。M的下限是Execution Memory初始值,上限是spark.executor.memory * spark.memory.fraction划定的所有内存区域。在应用刚刚开始执行的时候,M的取值就是这个上限,但随着RDD缓存逐渐填充Storage Memory,M的取值也会跟着回撤。

另外,到目前为止,(1/N/2,1/N)上下限的计算我们用的都是线程总数N,线程总数N是固定的。N的取值含义是一个Executor内最大的并发度,更严格的计算公式是spark.executor.cores除以spark.task.cpus。但实际上,上下限公式的计算用的不是N,而是N~。N~的含义是Executor内当前的并发度,也就是Executor中当前并行执行的任务数。显然N~ <= N。

换句话说,尽管一个Executor中有N个CPU线程,但这N个线程不一定都在干活。在Spark任务调度的过程中,这N个线程不见得能同时拿到分布式任务,所以先拿到任务的线程就有机会申请到更多的内存。在某些极端的情况下,后拿到任务的线程甚至连一寸内存都申请不到。不过,随着任务执行和任务调度的推进,N~会迅速地趋近于N,CPU线程挂起和内存分配的情况也会逐渐得到改善。

就像黄小乙的补充条款中举的那个例子一样,当可耕种土地总面积为800顷的时候,如果有400位老乡申请土地转让权,那么每位老乡最多可得800/400=2顷土地,最低可得800/400/2=1顷土地。

但如果这400位老乡不是同时来的,而是分两批来的,每批来200人的话,就会出现问题。按照他的规则,先来的这200位老乡,每人最多可得800/200 = 4顷土地。咱们前面说了,每个申请的老乡都想通过这次机会发点小财,于是这200位老乡每人都申请了4顷地,黄小乙的地一下子就被分光了!后来的200位老乡就没地可种了,他们只能等到第一批老乡的棉花和咖啡丰收了,再重新一起申请土地转让权。

假设第一批老乡同时大丰收,按照黄小乙转让规则的第一条,第一批老乡要交出土地使用权,如果想要继续耕种的话,就得和第二批老乡一起重新申请。在这种情况下,上下限的计算才是黄小乙最开始举例的那种算法。

第三个影响任务并发度和内存分配的因素,是分布式数据集的分布情况。在刚才的例子中,如果第一批老乡每人只申请2顷土地,那么第二批老乡来了之后依然有地可种。每人申请多大的土地,取决于他手里有多少农作物种子,我们之前把每个Task需要处理的数据分片比作是作物种子,那么,数据分片的数据量决定了执行任务需要申请多少内存。如果分布式数据集的并行度设置得当,因任务调度滞后而导致的线程挂起问题就会得到缓解

CPU低效原因之二:调度开销

线程挂起的问题得到缓解,CPU利用率就会有所改善。既然如此,是不是把并行度设置到最大,每个数据分片就都能足够小,小到每个CPU线程都能申请到内存,线程不再挂起就万事大吉了呢?

当然不是,并行度足够大,确实会让数据分片更分散、数据粒度更细,因此,每个执行任务所需消耗的内存更少。但是,数据过于分散会带来严重的副作用:调度开销骤增。

对于每一个分布式任务,Dirver会将其封装为TaskDescription,然后分发给各个Executor。TaskDescription包含着与任务运行有关的所有信息,如任务ID、尝试ID、要处理的数据分片ID、开发者添加的本地文件和Jar包、任务属性、序列化的任务代码等等。Executor接收到TaskDescription之后,首先需要对TaskDescription反序列化才能读取任务信息,然后将任务代码再反序列化得到可执行代码,最后再结合其他任务信息创建TaskRunner。

因此你看,每个任务的调度与执行都需要Executor消耗CPU去执行上述一系列的操作步骤。数据分片与线程、执行任务一一对应,当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据量却少之又少,就CPU消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与之分庭抗礼。显然,在这种情况下,CPU的有效利用率也是极低的。

如何优化CPU利用率?

你可能会说:“这也太尴尬了,并行度低了不行,容易让CPU线程挂起;高了也不行,调度开销太大,CPU有效利用率也不高。高也不行、低也不行,那我该怎么办呢?”

咱们不妨来算笔账。我们还是拿黄小乙的如意算盘来举例,如果400个老乡同时来申请他的800顷地,那么每个老乡能分到1到2顷土地不等。相应地,每位老乡需要购买的种子应该刚好够种满1到2顷地。因为,买多了种不下,买少了还亏。假设洼子村农产品交易市场的种子总量刚好够种1000顷地,从卖家的视角出发,这些种子应该售卖1000/2 =500到1000/1 = 1000次,才能赚到最多的钱。

因此,在给定Executor线程池和执行内存大小的时候,我们可以参考上面的算法,去计算一个能够让数据分片平均大小在(M/N/2, M/N)之间的并行度,这往往是个不错的选择

总的来说,对CPU利用率来说,并行度、并发度与执行内存的关系就好像是一尊盛满沸水的三足鼎,三足齐平则万事大吉,但凡哪一方瘸腿儿,鼎内的沸水就会倾出伤及无辜。

小结

今天这一讲,我们从CPU与执行内存平衡的角度,通过梳理Executor并行度、并发度和执行内存之间的关系,以及它们对CPU利用率的影响,总结出了有效提升CPU利用率的方法。

首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。

其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也不利于提升CPU利用率。

最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。

每日一练

  1. 从Executor并发度、执行内存大小和分布式任务并行度出发,你认为在什么情况下会出现OOM的问题?
  2. 由于执行内存总量M是动态变化的,并发任务数N~也是动态变化的,因此每个线程申请内存的上下限也是动态调整的,你知道这个调整周期以什么为准?

期待在留言区看到你的思考和答案,如果你的朋友也在为提高CPU利用率苦恼,欢迎你把这一讲转发给他,我们下一讲见!

精选留言

  • Geek_d794f8

    2021-04-16 13:33:07

    老师,在考虑并行度,内存,线程数三者之间的平衡时,spark.sql.shuffle.partitions的值是shuffle的reducer阶段的并行度,那么对于从数据源读取(比如读hive表)这个起始的map阶段的并行度是否需要考虑?这个阶段spark底层有某种默认的切片规则吗,需要在代码中人为的干预吗(比如coalesce)? 我使用的是DataFrame和DataSet的api。
    作者回复

    都需要考虑,结合你这个例子,整体逻辑是这样的。

    首先,Spark读取分布式文件,获取数据源,这个时候,并行度就是文件在分布式文件系统上的并行度,比如HDFS、比如S3。HDFS的分片可能是128M或是256M,那么它的并行度,就取决于文件总大小和分片大小的商。

    这个时候,由于分片大小是固定的,你可以结合分片大小,去设置执行内存和并发度(executor线程池),让他们满足(1/N/2,1/N)的关系。

    然后,你设置的spark.sql.shuffle.partitions,会控制Joins之中的Shuffle Reduce阶段并行度,这个参数设置多少。其实取决于你Shuffle过后每个Reduce Task需要处理的数据分片大小。由于你之前是按照128M或是256M来设置的执行内存, 和并发度。这个时候,在设置spark.sql.shuffle.partitions这个值的时候,只要保证数据分片大小还是在128M或是256M左右(shuffle前可能有过滤、过程中还会有聚合,所以原来的并行度就不合适了),就依然能维持“三足鼎立”的计算平衡。

    所以说,核心是维持这个平衡。在你这个case,核心思路是,根据“定下来的”,去调整“未定下来的”,就可以去设置每一个参数了。

    在最开始,“定下来的”是并行度,因为这个是文件系统决定的,但是执行内存和并发度未定,所以你可以相应地去调整这两类参数。

    后面Shuffle的时候,执行内存和并发度已经定下来了,但是spark.sql.shuffle.partitions未定,你再结合那个公式,去定这个参数的值就好了。思路就是这么个思路,其实还是万变不离其宗。

    2021-04-16 17:47:12

  • zxk

    2021-04-17 21:05:36

    问题一:并发度决定了数据分片的大小:
    - 在每个线程都分配到了最大内存,即 M/N 的内存时,如果 task 还需要更多的内存,那么就会发生 OOM。
    - 在每个线程都分配到了最少内存,即 M/2N的内存时,如果 task 还需要更多的内存,此时又没有其他线程释放内存供其使用,那么也会导致OOM。
    作者回复

    没问题,满分💯 ~

    2021-04-18 23:43:55

  • qconljk

    2021-04-14 19:26:23

    首先,在一个 Executor 中,每个 CPU 线程能够申请到的内存比例是有上下限的,最高不超过 1/N,最低不少于 1/N/2,其中 N 代表线程池大小。这个除以2,2代表什么?
    作者回复

    好问题,其实没有什么特别的含义,就是一种公平机制,就是保证至少有均值的1/2可以满足,否则就不进行计算。

    其实你说把它改成3成不成,我觉得也没什么不可以。但是,改成3之后,task最低内存保障更低了,即便你有1/3给它,它也完不成计算,其实还是得挂起。

    1/2的保底,其实更make sense,因为1/2的内存相比均值来说,只差了一半,这样有些任务你先分配了1/2,运行的过程中,其他task还会释放内存,这个时候,这个task还是可以成功执行的。但如果是1/3,你亏空的内存更多,需要等待的概率越大,挂起的概率也就越大。

    2021-04-15 19:46:58

  • 斯盖丸

    2021-04-14 07:04:10

    这讲看得有些迷,想请问下老师如何从UI角度看出来我任务的并行度是否合适呢?
    作者回复

    Spark UI有专门的配置页,记录了不同配置项的设置值,其中有默认并行度设置。

    但重点不在这,重点是协调、平衡“三足鼎立”,也就是并行度、并发度、执行内存,去提升CPU利用率,所以你更需要使用系统工具、监控工具,比如ganglia、Prometheus、Grafana、nmon、htop等等,这些工具,去观察你的CPU利用率,然后回过头来,平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式,来去调优。

    2021-04-14 18:05:59

  • wow_xiaodi

    2021-08-02 15:38:14

    最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间
    这里很不解为何内存的大小和分片大小有直接联系,无论是计算过程还是shuffle过程,都是用到一些内存占用较小的数据结构去做的,就算内存不够用,也会有gc去保证。这个公式感觉就是让数据分片大小和执行内存等价了,让所有数据都在待在内存中一次性批处理,而不是处理一部分溢出落盘再继续处理?请老师指正
    作者回复

    好问题~ 是这样,这个公式的目的,主要是让每个Task能够拿到并处理适量的数据,不至于因为数据分布本身,而带来OOM。

    D/P ~ (M/N/2, M/N),也就是数据分片大小,让他与M/N在同一个当量。这样基本能够保证每个Task处理的数据是适量的。

    怎么理解适量呢,就是在消耗一定内存(如AppendOnlyMap)的基础上,有少量的溢出。我们知道,D/P是原始数据的尺寸,真正到内存里去,是会翻倍的,至于翻多少倍,这个和文件格式有关系。不过,不管他翻多少倍,只要原始的D/P和M/N在一个当量,那么我们大概率就能避开OOM的问题,不至于某些Tasks需要处理的数据分片过大而OOM。

    整体上是这么个逻辑,不确定我说清楚了没,有问题再讨论哈~

    2021-08-04 14:53:23

  • 小灵芝

    2021-04-30 23:53:50

    “在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间。这样,在运行时,我们的 CPU 利用率往往不会太差。”

    请问老师,这里的M, N, D 都是针对一个executor而言的对吧?
    作者回复

    稍有差别,M、N是针对Executor的,也就是Executor的执行内存M、线程池大小N。

    D和P不一样,它指的是你的分布式数据集,D是数据总量,比如20GB;而P指的是这份数据集的并行度,比如200,那么你的每个数据分片的大小D/P,就是20GB/200 = 100MB。

    如果你的M是2GB,也就是2GB的执行内存,N是20,也就是20个线程,那么这个时候,M/N就是100MB。那么,你的D/P就刚好坐落在(M/N/2, M/N)这个区间里。

    2021-05-01 17:29:24

  • Geek_d794f8

    2021-04-16 13:20:37

    我的理解是每个线程申请的内存上限是M/N,那么当数据分片过少,某个task需要处理的数据量较大,M/N的上限执行内存也不够时,就会出现OOM。
    不知道这么理解对不对?
    作者回复

    对,没错~

    2021-04-16 17:48:25

  • Sampson

    2022-01-19 12:13:46

    老师您好,请教一下,在上文中有提到spill机制可以保护oom 的溢出,这个是怎么判断的呢
    作者回复

    这个需要熟悉shuffle里面,用到的内存数据结构,比如AppendOnlyMap,PartitionedPairBuffer,等等。在Shuffle map task阶段,Spark会利用类似的数据结构,来计算数据,当这些数据结构空间不足的时候,Spark会成倍扩容这些数据结构,但是只会扩容一次。想象一下,如果没有Spill机制,实际上Spark很容易OOM,因为扩容一次之后,也很难容下单个数据分片的全部数据

    2022-01-21 23:39:14

  • 斯盖丸

    2021-05-28 10:24:51

    老师,顺着王天雨同学的问题我接着问。我的executor memory为9G,executor的off heap memory也为9G,executor cores为5个,executor instances为30个。
    以上是我的配置。
    照您的公式,我的数据分片的大小就应该在(9G+9G)/5/2=1.8G到(9G+9G)/5=3.6,即数据分片在(1.8G,3.6G)的范围内吗,那进一步说,我在Spark UI里,找到第一个读取parquet的任务,看shuffle read size这个指标,如果在(1.8G,3.6G)这个区间之内,说明就是可以的,是这样吗?

    感觉这个分片好大哦~~
    作者回复

    好问题,这个公式的作用其实是双向的,也即是给定资源,你可以算出并行度;反过来,找到合适的并行度,你也可以指导Spark集群资源的设定。

    你说的没错,2GB的分片,确实太大了,通常来说,分片大小在200MB左右,是比较合适的,推荐把分片大小设定在这个范围。

    有了分片大小,其实就可以反向指导Spark资源设定了。在你的例子里面,我理解资源是提前设定好了,也就是你说的堆内外分别9G,5个cores,也许这个设定是基于经验、也许是出于直觉。

    如果用200MB分片大小反推过来的话,其实你可以考虑降低Executors的内存配置,或是提高它的CPU cores配置,这样把内存与CPU做到一个均衡配比,相比现在一个task要处理2GB的数据,效果要更好~

    毕竟咱们性能调优的本质,是让硬件资源之间尽量去平衡,调节并行度也好、分片大小也好、各种资源参数也好,实际上都是手段。

    2021-05-30 23:58:12

  • kingcall

    2021-04-14 10:06:12

    M 的下限是 Execution Memory 初始值,上限是 spark.executor.memory * spark.memory.fraction 划定的所有内存区域。这个老师笔误,写错了吧!
    作者回复

    没写错哟,上限就是 spark.executor.memory * spark.memory.fraction ,也就是storage memory + execution memory之和。Unified memory manager,在统一管理模式下,大家是可以互相抢占的,因此,如果没有分布式数据集缓存,storage memory那片内存区域,执行任务是都可以抢过来的,所以上限就是两者之和。

    2021-04-14 18:01:17

  • Unknown element

    2021-12-27 10:11:31

    老师您好 spark在计算过程中会输出类似这样的日志:
    [Stage 1:==================================================>(11765 + 6) / 11769]
    想问下这里的数字是 task 数吗?为什么括号里的两个task数加起来不等于外面task数呢?
    另外这里的task是不是偏多?应该从哪些方面去排查问题呢?
    谢谢老师
    作者回复

    先来说第一个哈
    (11765[Finished] + 6[Ongoing]) / 11769[Total]
    最后一个,11769,是Total tasks,所有tasks的数量;11765,是Finished的tasks数量,包括Failed,6是正在执行的tasks,两边加起来不一样,我理解是Finished里面有Failed,然后重试的

    Tasks多倒不是什么大问题,主要是要结合“三足鼎立”,调整并行度,让CPU、内存、并行度保持一致就行

    2022-01-01 10:20:54

  • 子兮

    2021-11-18 14:53:14

    老师,有没有可能:M/N =45M/3core=15M, 有三个分片 5M ,12M,18M, 理论来讲,18M 的task 会出现oom,三个task 同时执行,但在执行过程中,由于5M 的task 迅速执行完成,使得内存释放,这时18M 在执行时获得45/2=22.5M,没有报oom ?
    2 spark graphx 在执行时每个job 所需要处理的数据量都不同,没有办法实时评估更改,这时的设置应该怎样思考呢?
    作者回复

    先说1,完全有可能~ task对于内存的申请,是随需随用的,并不是一开始就分配M&#47;N&#47;2,所以跑得快的小task结束之后,空闲的内存,就可以留给大的task,当然,这里还要考虑gc的延迟。task数量有变化,Spark就会重新计算M&#47;N,也就是你给的5&#47;2=22.5M,所以18M的task,就不会OOM~

    问题2的话,这个比较tricky,这个咱们很难on the fly地再去做调整,比较行之有效的办法,就是在最开始,结合原始数据量大小(内存中的大小),再结合CPU、内存配置,按照“三足鼎立”的思路,去设置

    2021-11-23 18:09:07

  • 静心

    2021-10-16 11:37:29

    老师,这一讲太理论范了,能不能举一些实际的例子,供大家学习加深理解,谢谢老师
    作者回复

    确实~

    不过资源这几讲,都偏运行时一些,比较难用直观的方式做演示,更多的是理论上的指导和方法论~ 需要大家结合日常的实战,多实践、多反思、多思考

    2021-10-18 13:14:55

  • 静心

    2021-10-08 09:00:29

    老师关于cpu线程挂起,描述的是有cpu资源而没执行内存使用而挂起的情况,主要还是动态变化的M(执行内存)与动态变化N(当前使用的cores)的问题,文中提到线程申请内存的大小也是通过M和N计算的。但文中也提到和分布式数据集的数据分布也有关系,这里我不太理解,线程可使用执行内存大小是通过M和N计算的,这和分布式数据集的数据分布有什么关系呢?
    作者回复

    我们一直在推荐:
    D/P ~ (M/N/2,M/N)这样一个最具配比公式,其中~线,表示左右两边在一个数量级。

    但这里有个地方需要特别注意,就是Task的内存分配,是on demand的,就是随着计算的需要,而不断地分配内存,而内存的上下限,由(M/N/2,M/N)划定。

    所以,执行内存的消耗,一定是跟数据集(以及并行度)有关的,和M、N的关系就更大了。不妨从需求和供给的角度,去理解这个公式。

    左边的D/P,是需求,也就是一个Task需要处理的数据量,对应着的,就是内存需求;

    而右边的(M/N/2,M/N),是系统可以提供的per Task的内存供给,也就是说,系统层面,资源层面,能够给每个Task提供多少内存资源。

    需求与供给,一定要匹配或者说一致,才能达到一种平衡的状态,也就是我们一直强调的“无瓶颈”的状态。

    2021-10-12 23:01:17

  • Sean

    2021-08-28 18:26:20

    老师提到的从分片大小200M反推,来配置spark.sql.shuffle.partitions,spark.executor.memory,spark.executor.cores参数,不知道理解的对不对,在不考虑数据压缩的情况下,比如有1T的数据,那么按200分片考虑,最佳并行度计算按1048576M/200M≈5243计算吗,set spark.sql.shuffle.partitions=5243,好像还没有get到这个资源计算公式的应用,
    我结合这几个配置推导方式如下:--executor-cores 4 --executor-memory 6g --num-executors 80
    D/P => (M/N/2,M/N) => (6*80/4/2,6*80/4) => (60G,120G) 这个配置显然不合适,修改一下参数--executor-cores 1500 得到:
    D/P => (M/N/2,M/N) => (6*80/1500/2,6*80/1500) => (164M,327M) 看着这个配置算是合理的利用cpu吗
    根据这个配置得到的区间在继续推导:
    如果数据总量D=1T≈1048576M,那并行度P:spark.sql.shuffle.partitions的大小在这(1048576/164,1048576/327)=>(6394,3207)区间,感觉这个并行度好像太大,也不是很合理,不知道我理解的对不对
    作者回复

    D/P => (M/N/2,M/N) :
    (6/4/2 ,6/4),这里没有--num-executors 80什么事,这个公式是per executor来计算的,不能把executors个数都加上,建议再读读原文哈~

    而且,实际上这里也不能按6g来算,应该是Storage + Execution两部分内存才对,具体算法,你可以再看看文稿~ 6g是所有的内存,是Reserved + User + Storage + Execution。

    2021-09-06 22:57:58

  • Geek

    2021-05-17 22:11:29

    吴老师 请教2个问题哈
    1、当一个源文件的大小超过了M/N 的内存时,这种情况是不是会报OOM?
    2、spark在加载parquet+snappy压缩文件时,它会考虑解压之后的文件大小吗?
    作者回复

    好问题~

    我们先说第二个问题,Spark考虑的,不是数据在磁盘中的存储大小,是内存中的存储大小,所以恰恰是Parquet+Snappy解压后、在内存中的存储大小~

    再来说第一个,源文件通常指的是磁盘中的文件,因此我们也要计算它加载进内存的存储大小。现在假设加载进内存的大小是X,它比M/N大。这个时候,通常来说,Spark并不会立即内存溢出OOM,因为它有Spill机制做保护,这部分细节可以参考Shuffle原理那一讲。Spill机制保证在Task内存不足的情况下,把内存中的数据排序并溢出到临时文件,因此,当X稍大于M/N的情况下,Spark的Spill机制可以避免OOM的发生。

    但是,当某个Task的X远大于M/N,或是所有的Task的X都超过了M/N,这个时候,OOM的概率会陡然上升。

    为了有效降低OOM的概率和可能,同时为了叙述方便,我们在文中建议大家把X控制在[M/N/2,M/N]之间。这么做到目的有几个:
    1)一个是提升CPU利用率
    2)一个是提升内存利用率
    3)再有一个,就是有效避免OOM

    2021-05-19 19:00:06

  • 西南偏北

    2021-05-04 21:10:14

    每个线程申请执行内存的时候都会跟所请求的内存大小进行比较。为一个内存消费者(MemoryConsumer)申请执行内存的具体实现逻辑在TaskMemoryManager#acquireExecutionMemory()方法中,这个方法为内存消费者(consumer)申请指定大小的内存空间,如果没有足够的内存,将会对consumer进行spill()来释放更多内存,具体要对哪些consumer进行spill(),会有一个排序算法(使用了TreeMap)。
    作者回复

    没错,老弟代码看的很细致~

    2021-05-10 22:58:40

  • 小灵芝

    2021-04-30 22:51:35

    “并行度可以通过两个参数来设置,分别是 spark.default.parallelism 和 spark.sql.shuffle.partitions。前者用于设置 RDD 的默认并行度,后者在 Spark SQL 开发框架下,指定了 Shuffle Reduce 阶段默认的并行度。”

    请问老师,这两个参数是都要设置吗?还是说在用RDD的时候设置前者,Dataframe或者Dataset的时候设置后者即可?
    作者回复

    对,看你的应用使用哪个API,RDD的话,用前者;DF或是DS的话,用后者。

    2021-05-01 17:49:51

  • 来世愿做友人 A

    2021-04-16 00:33:38

    1. excutor 并发度如果过高,考虑极端情况,storageMemery 还没有使用,这时候这部分内存是会被 task 所抢占的,每个任务的上限是 1/N。但是对于 storage 和 execute pool 的池锁不是同一个锁。并发高的情况下,可能会出现 execute 剩余内存假设 1M,task 申请 execute 内存并且 size 申请成功,但是还没 alloc 内存。并行条件下,此时别的 task 线程刚好申请了 storage pool 的内存 0.8M 进行 block 存储,并且申请成功。然后申请 1M 的 task 线程才 allocate,却发现内存不够 OOM 了。不知道这样对不对
    2. 其实在 task 每次的 allocatePage,都会动态计算 task 当先的上下限的申请大小。满足申请条件,返回。申请后可用大小加上当前已用不满足下限,则挂起等待其它 task 唤醒抢占。
    作者回复

    第二题完美~

    第一题也对,不过算是edge case了。可以想想,一般情况下,in general,为啥task会OOM?

    2021-04-16 23:09:26

  • Fendora范东_

    2021-04-15 00:17:22

    1.当executor内所有线程都在运行,实际需要内存比预估申请内存大,这个时候执行内存又不能继续扩展,就会出现oom
    2.executor内正在运行的这批任务执行完,下一批任务被执行前,就进行资源调整。根据此时执行内存大小/min(待分配任务数,executor.cores)进行内存分配
    作者回复

    第二题答得很完美~ 💯

    第一题也对。不过,结合(1/N/2,1/N)的上下限,能再往细了说一说吗?

    2021-04-15 19:55:59