24 | Spark 3.0(一):AQE的3个特性怎么才能用好?

你好,我是吴磊。

目前,距离Spark 3.0版本的发布已经将近一年的时间了,这次版本升级添加了自适应查询执行(AQE)、动态分区剪裁(DPP)和扩展的 Join Hints 等新特性。利用好这些新特性,可以让我们的性能调优如虎添翼。因此,我会用三讲的时间和你聊聊它们。今天,我们先来说说AQE。

我发现,同学们在使用AQE的时候总会抱怨说:“AQE的开关打开了,相关的配置项也设了,可应用性能还是没有提升。”这往往是因为我们对于AQE的理解不够透彻,调优总是照葫芦画瓢,所以这一讲,我们就先从AQE的设计初衷说起,然后说说它的工作原理,最后再去探讨怎样才能用好AQE。

Spark为什么需要AQE?

在2.0版本之前,Spark SQL仅仅支持启发式、静态的优化过程,就像我们在第21、22、23三讲介绍的一样。

启发式的优化又叫RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。也就是说,启发式的优化实际上算是一种经验主义

经验主义的弊端就是不分青红皂白、胡子眉毛一把抓,对待相似的问题和场景都使用同一类套路。Spark社区正是因为意识到了RBO的局限性,因此在2.2版本中推出了CBO(Cost Based Optimization,基于成本的优化)。

CBO的特点是“实事求是”,基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。CBO支持的统计信息很丰富,比如数据表的行数、每列的基数(Cardinality)、空值数、最大值、最小值和直方图等等。因为有统计数据做支持,所以CBO选择的优化策略往往优于RBO选择的优化规则。

但是,CBO也面临三个方面的窘境:“窄、慢、静”。窄指的是适用面太窄,CBO仅支持注册到Hive Metastore的数据表,但在大量的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如Parquet、ORC、CSV等等。

慢指的是统计信息的搜集效率比较低。对于注册到Hive Metastore的数据表,开发者需要调用ANALYZE TABLE COMPUTE STATISTICS语句收集统计信息,而各类信息的收集会消耗大量时间。

静指的是静态优化,这一点与RBO一样。CBO结合各类统计信息制定执行计划,一旦执行计划交付运行,CBO的使命就算完成了。换句话说,如果在运行时数据分布发生动态变化,CBO先前制定的执行计划并不会跟着调整、适配。

AQE到底是什么?

考虑到RBO和CBO的种种限制,Spark在3.0版本推出了AQE(Adaptive Query Execution,自适应查询执行)。如果用一句话来概括,AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

从定义中,我们不难发现,AQE优化机制触发的时机是Shuffle Map阶段执行完毕。也就是说,AQE优化的频次与执行计划中Shuffle的次数一致。反过来说,如果你的查询语句不会引入Shuffle操作,那么Spark SQL是不会触发AQE的。对于这样的查询,无论你怎么调整AQE相关的配置项,AQE也都爱莫能助。

对于AQE的定义,我相信你还有很多问题,比如,AQE依赖的统计信息具体是什么?既定的规则和策略具体指什么?接下来,我们一一来解答。

首先,AQE赖以优化的统计信息与CBO不同,这些统计信息并不是关于某张表或是哪个列,而是Shuffle Map阶段输出的中间文件。学习过Shuffle的工作原理之后,我们知道,每个Map Task都会输出以data为后缀的数据文件,还有以index为结尾的索引文件,这些文件统称为中间文件。每个data文件的大小、空文件数量与占比、每个Reduce Task对应的分区大小,所有这些基于中间文件的统计值构成了AQE进行优化的信息来源。

其次,结合Spark SQL端到端优化流程图我们可以看到,AQE从运行时获取统计信息,在条件允许的情况下,优化决策会分别作用到逻辑计划和物理计划。

AQE既定的规则和策略主要有4个,分为1个逻辑优化规则和3个物理优化策略。我把这些规则与策略,和相应的AQE特性,以及每个特性仰仗的统计信息,都汇总到了如下的表格中,你可以看一看。

如何用好AQE?

那么,AQE是如何根据Map阶段的统计信息以及这4个规则与策略,来动态地调整和修正尚未执行的逻辑计划和物理计划的呢?这就要提到AQE的三大特性,也就是Join策略调整、自动分区合并,以及自动倾斜处理,我们需要借助它们去分析AQE动态优化的过程。它们的基本概念我们在第9讲说过,这里我再带你简单回顾一下。

  • Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。
  • 自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。
  • 自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。

接下来,我们就一起来分析这3个特性的动态优化过程。

Join策略调整

我们先来说说Join策略调整,这个特性涉及了一个逻辑规则和一个物理策略,它们分别是DemoteBroadcastHashJoin和OptimizeLocalShuffleReader。

DemoteBroadcastHashJoin规则的作用,是把Shuffle Joins降级为Broadcast Joins。需要注意的是,这个规则仅适用于Shuffle Sort Merge Join这种关联机制,其他机制如Shuffle Hash Join、Shuffle Nested Loop Join都不支持。对于参与Join的两张表来说,在它们分别完成Shuffle Map阶段的计算之后,DemoteBroadcastHashJoin会判断中间文件是否满足如下条件:

  • 中间文件尺寸总和小于广播阈值spark.sql.autoBroadcastJoinThreshold
  • 空文件占比小于配置项spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin

只要有任意一张表的统计信息满足这两个条件,Shuffle Sort Merge Join就会降级为Broadcast Hash Join。说到这儿,你可能会问:“既然DemoteBroadcastHashJoin逻辑规则可以把Sort Merge Join转换为Broadcast Join,那同样用来调整Join策略的OptimizeLocalShuffleReader规则又是干什么用的呢?看上去有些多余啊!”

不知道你注意到没有,我一直强调,AQE依赖的统计信息来自于Shuffle Map阶段生成的中间文件。这意味什么呢?这就意味着AQE在开始优化之前,Shuffle操作已经执行过半了!

我们来举个例子,现在有两张表:事实表Order和维度表User,它们的查询语句和初始的执行计划如下。

//订单表与用户表关联
select sum(order.price * order.volume), user.id
from order inner join user
on order.userId = user.id
where user.type = ‘Head Users’
group by user.id

由于两张表大都到超过了广播阈值,因此Spark SQL在最初的执行计划中选择了Sort Merge Join。AQE需要同时结合两个分支中的Shuffle(Exchange)输出,才能判断是否可以降级为Broadcast Join,以及用哪张表降级。这就意味着,不论大表还是小表都要完成Shuffle Map阶段的计算,并且把中间文件落盘,AQE才能做出决策。

你可能会说:“根本不需要大表做Shuffle呀,AQE只需要去判断小表Shuffle的中间文件就好啦”。可问题是,AQE可分不清哪张是大表、哪张是小表。在Shuffle Map阶段结束之前,数据表的尺寸大小对于AQE来说是“透明的”。因此,AQE必须等待两张表都完成Shuffle Map的计算,然后统计中间文件,才能判断降级条件是否成立,以及用哪张表做广播变量。

在常规的Shuffle计算流程中,Reduce阶段的计算需要跨节点访问中间文件拉取数据分片。如果遵循常规步骤,即便AQE在运行时把Shuffle Sort Merge Join降级为Broadcast Join,大表的中间文件还是需要通过网络进行分发。这个时候,AQE的动态Join策略调整也就失去了实用价值。原因很简单,负载最重的大表Shuffle计算已经完成,再去决定切换到Broadcast Join已经没有任何意义。

在这样的背景下,OptimizeLocalShuffleReader物理策略就非常重要了。既然大表已经完成Shuffle Map阶段的计算,这些计算可不能白白浪费掉。采取OptimizeLocalShuffleReader策略可以省去Shuffle常规步骤中的网络分发,Reduce Task可以就地读取本地节点(Local)的中间文件,完成与广播小表的关联操作。

不过,需要我们特别注意的是,OptimizeLocalShuffleReader物理策略的生效与否由一个配置项决定。这个配置项是spark.sql.adaptive.localShuffleReader.enabled,尽管它的默认值是True,但是你千万不要把它的值改为False。否则,就像我们刚才说的,AQE的Join策略调整就变成了形同虚设。

说到这里,你可能会说:“这么看,AQE的Join策略调整有些鸡肋啊!毕竟Shuffle计算都已经过半,Shuffle Map阶段的内存消耗和磁盘I/O是半点没省!”确实,Shuffle Map阶段的计算开销是半点没省。但是,OptimizeLocalShuffleReader策略避免了Reduce阶段数据在网络中的全量分发,仅凭这一点,大多数的应用都能获益匪浅。因此,对于AQE的Join策略调整,我们可以用一个成语来形容:“亡羊补牢、犹未为晚”

自动分区合并

接下来,我们再来说说自动分区合并。分区合并的原理比较简单,在Reduce阶段,当Reduce Task从全网把数据分片拉回,AQE按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起。目标分区尺寸由以下两个参数共同决定。这部分我们在第10讲详细讲过,如果不记得,你可以翻回去看一看。

  • spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。
  • spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后,分区数不能低于该值。

除此之外,我们还要注意,在Shuffle Map阶段完成之后,AQE优化机制被触发,CoalesceShufflePartitions策略“无条件”地被添加到新的物理计划中。读取配置项、计算目标分区大小、依序合并相邻分区这些计算逻辑,在Tungsten WSCG的作用下融合进“手写代码”于Reduce阶段执行。

自动倾斜处理

与自动分区合并相反,自动倾斜处理的操作是“拆”。在Reduce阶段,当Reduce Task所需处理的分区尺寸大于一定阈值时,利用OptimizeSkewedJoin策略,AQE会把大分区拆成多个小分区。倾斜分区和拆分粒度由以下这些配置项决定。关于它们的含义与作用,我们在第10讲说过,你可以再翻回去看一看。

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
  • spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度

自动倾斜处理的拆分操作也是在Reduce阶段执行的。在同一个Executor内部,本该由一个Task去处理的大分区,被AQE拆成多个小分区并交由多个Task去计算。这样一来,Task之间的计算负载就可以得到平衡。但是,这并不能解决不同Executors之间的负载均衡问题。

我们来举个例子,假设有个Shuffle操作,它的Map阶段有3个分区,Reduce阶段有4个分区。4个分区中的两个都是倾斜的大分区,而且这两个倾斜的大分区刚好都分发到了Executor 0。通过下图,我们能够直观地看到,尽管两个大分区被拆分,但横向来看,整个作业的主要负载还是落在了Executor 0的身上。Executor 0的计算能力依然是整个作业的瓶颈,这一点并没有因为分区拆分而得到实质性的缓解。

另外,在数据关联的场景中,对于参与Join的两张表,我们暂且把它们记做数据表1和数据表2,如果表1存在数据倾斜,表2不倾斜,那在关联的过程中,AQE除了对表1做拆分之外,还需要对表2对应的数据分区做复制,来保证关联关系不被破坏。

在这样的运行机制下,如果两张表都存在数据倾斜怎么办?这个时候,事情就开始变得逐渐复杂起来了。对于上图中的表1和表2,我们假设表1还是拆出来两个分区,表2因为倾斜也拆出来两个分区。这个时候,为了不破坏逻辑上的关联关系,表1、表2拆分出来的分区还要各自复制出一份,如下图所示。

如果现在问题变得更复杂了,左表拆出M个分区,右表拆出N各分区,那么每张表最终都需要保持M x N份分区数据,才能保证关联逻辑的一致性。当M和N逐渐变大时,AQE处理数据倾斜所需的计算开销将会面临失控的风险。

总的来说,当应用场景中的数据倾斜比较简单,比如虽然有倾斜但数据分布相对均匀,或是关联计算中只有一边倾斜,我们完全可以依赖AQE的自动倾斜处理机制。但是,当我们的场景中数据倾斜变得复杂,比如数据中不同Key的分布悬殊,或是参与关联的两表都存在大量的倾斜,我们就需要衡量AQE的自动化机制与手工处理倾斜之间的利害得失。关于手工处理倾斜,我们留到第28讲再去展开。

小结

AQE是Spark SQL的一种动态优化机制,它的诞生解决了RBO、CBO,这些启发式、静态优化机制的局限性。想要用好AQE,我们就要掌握它的特点,以及它支持的三种优化特性的工作原理和使用方法。

如果用一句话来概括AQE的定义,就是每当Shuffle Map阶段执行完毕,它都会结合这个阶段的统计信息,根据既定的规则和策略动态地调整、修正尚未执行的逻辑计划和物理计划,从而完成对原始查询语句的运行时优化。也因此,只有当你的查询语句会引入Shuffle操作的时候,Spark SQL才会触发AQE。

AQE支持的三种优化特性分别是Join策略调整、自动分区合并和自动倾斜处理。

关于Join策略调整,我们首先要知道DemoteBroadcastHashJoin规则仅仅适用于Shuffle Sort Merge Join这种关联机制,对于其他Shuffle Joins类型,AQE暂不支持把它们转化为Broadcast Joins。其次,为了确保AQE的Join策略调整正常运行,我们要确保spark.sql.adaptive.localShuffleReader.enabled配置项始终为开启状态。

关于自动分区合并,我们要知道,在Shuffle Map阶段完成之后,结合分区推荐尺寸与分区数量限制,AQE会自动帮我们完成分区合并的计算过程。

关于AQE的自动倾斜处理我们要知道,它只能以Task为粒度缓解数据倾斜,并不能解决不同Executors之间的负载均衡问题。针对场景较为简单的倾斜问题,比如关联计算中只涉及单边倾斜,我们完全可以依赖AQE的自动倾斜处理机制。但是,当数据倾斜问题变得复杂的时候,我们需要衡量AQE的自动化机制与手工处理倾斜之间的利害得失。

每日一练

  1. 我们知道,AQE依赖的统计信息来源于Shuffle Map阶段输出的中间文件。你觉得,在运行时,AQE还有其他渠道可以获得同样的统计信息吗?
  2. AQE的自动倾斜处理机制只能以Task为粒度来平衡工作负载,如果让你重新实现这个机制,你有什么更好的办法能让AQE以Executors为粒度做到负载均衡吗?

期待在留言区看到你的思考和答案,我们下一讲见!

精选留言

  • 周俊

    2021-08-16 14:55:07

    老师,

    //订单表与用户表关联
    select sum(order.price * order.volume), user.id
    from order inner join user
    on order.userId = user.id
    where user.type = ‘Head Users’
    group by user.id
    这段代码中,catalyst会做谓词下推处理,将过滤条件放到贴近数据源扫描的地方,这里过滤之后user表的大小已经属于小表变成广播变量的阈值了吧,为什么还会有AQE呢
    作者回复

    好问题~

    推出AQE之前,Spark SQL的静态优化没有那么智能。举例来说,表A:10G;表2:2G,表2过滤之后100MB,然后比如把广播阈值设置为1GB。讲道理,理论上,过滤之后应该选择Broadcast Join才对。

    但是,Spark SQL在静态优化期间,会根据表A、B的存储大小,来判断选择什么Join策略,由于表A、B都大于1GB,所以Spark SQL在优化阶段自然选择Shuffle Join。它在优化阶段,并不知道filter之后,表B会变小,小到足以放进广播变量。这个正是静态优化的限制与痛点,是AQE设计初衷之一。

    但是AQE就不同,根据Shuffle过后的中间文件,Spark SQL可以动态地决定,把原理的Shuffle Join调整为Broadcast Join。

    2021-08-19 18:38:01

  • Fendora范东_

    2021-05-09 23:43:17

    磊哥,请教下
    Join策略调整里面,为啥只有SMJ能降级,而其他Join比如SHJ不能降级
    作者回复

    好问题~

    其实这个问题我特别想让你独立思考,你先花5分钟想想,结合咱们之前讲过的内容,然后再看答复~

    —————————给你留个分割线,哈哈————————————————

    首先,Spark有个参数:spark.sql.join.prefersortmergeJoin,它默认是true,也就是说,对于Shuffle Joins,Spark SQL会prefer也就是偏向优先选择Shuffle Sort Merge Join,换句话说,如果你不动这个参数的话,那么Spark SQL根本不会去考虑用Shuffle Hash Join。这么说吧,在Spark SQL眼里,Shuffle Hash Join的地位非常的低,虽然这一点我不认可,但是Spark就是这么干的。

    所以,如果你想启用Shuffle Hash Join,那就必须把spark.sql.join.prefersortmergeJoin置为false。这还不算完,即便如此,Spark SQL其实还是会倾向使用Shuffle Sort Merge Join。当且仅当如下条件成立的时候,Spark才有可能考虑Shuffle Hash Join:
    1)外表比内表至少大3倍
    2)内表所有数据分片,都要小于广播变量

    你说说,这个条件苛刻不?因此,想要启用Shuffle Hash Join,开发者就只剩下一条路可走:Join hints。也就是使用.hint("SHUFFLE_HASH")关键字,来强制Spark SQL选择Shuffle Hash Join。但是这个时候,你都使用强制机制了,Spark SQL会走单独的code path,不用再去利用AQE的Join策略调整、去看看能不能转Broadcast Joins。

    因此,总结下来,在AQE的优化机制下,只有SMJ会降级为Broadcast Hash Join,压根跟Shuffle Hash Join没啥关系~

    2021-05-13 18:13:13

  • sparkjoy

    2021-08-27 01:10:19

    老师,为什么不在join之前统计两个表的大小,从而决定是否用BHJ,而是map结束之后才根据shuffle文件总大小去判断呢?
    作者回复

    spark的计算模式是惰性求值,在没有action的情况,对于数据集大小的判断,其实都是一种基于统计的猜测,这个就是CBO的工作原理。CBO要想work,必须要先用ANALYZE TABLE来统计数据表的各种信息。

    AQE相反,放弃了这种思路,不再提取计算统计信息,而是根据运行时的反馈,来动态优化。shuffle本身,你可以把它看作是一种“action”,因为map阶段要落盘,reduce阶段在本质上,是另一个Stage的“map”阶段。下一个stage通过磁盘与上一个stage交换数据。也就是说,Spark能利用的运行时数据,只有map阶段输出的中间文件,对于数据表的大小也好,过滤之后的表大小也好,如果没有shuffle map的中间文件,spark并没有哪里可以获取关于数据表大小、分布这类信息。因此,AQE必须依赖shuffle中间文件,来完成动态的判断。

    2021-09-04 22:39:57

  • zxk

    2021-05-07 21:39:48

    问题一:个人认为 AQE 可以在加载读取文件的时候获取一些运行时信息,或者做 cache 的时候。这里也有个疑问,就是 AQE 会不会根据这些信息也进行一些优化?
    问题二想到两种方法,不知道是否可行:
    1. Spark 有一个动态分配调整 executor 的功能,在 Shuffle Map 阶段由 Driver 端汇聚信息决定好倾斜数据的切割方式,之后部分数据发送到原有的 executor 上,切割出来的数据发送到新的 executor 上,同时也需要注意对应做关联的数据也需要复制一份传输到新的 executor 上,但这样会带来 driver 端决策的开销、新的 executor 调度开销以及关联数据额外复制并通过网络传输的开销。
    2. 仍按照原来的方式进行,但在 Reduce 阶段切割数据后,起一个新的 executor 来分担切割后的数据,并通知 driver 端。如果能够在同节点上新起 executor,还可以消除网络之间的传输,只做进程间的数据传输即可。

    这里想向老师请教一个关于 Join 策略调整的问题,如果 a、b 为事实表,c 为维度表,a、c 做关联后 c 从原来的 SMJ 被 AQE 优化为了 BHJ 后,如果紧接着 b 又跟 c 做关联,那么 Spark 是否会直接使用
    BHJ,还是仍需要将 b、c 做 SHuffle Map 之后才能优化为 BHJ?
    作者回复

    先来说说你提的问题,好问题~

    这里面涉及到一个多表Join的执行过程问题,先说结论,无论你的查询涉及多少张表,所有的数据库引擎(包括Spark SQL在内),都是以两表Join为粒度,逐渐地完成Join的。以你的例子来说,a、b、c三张表,a、c先关联,那么这个时候,a、c会生成关联后的中间结果tmp,后续和b表关联的,是这个“tmp表”,已经不再是你说的c表了。因此,回答你的问题,a、c关联利用AQE转化为BHJ,那么后续Spark SQL需要继续关联tmp和b,这个时候,AQE会继续判定tmp结果集大小,如果小于广播阈值,那么它就会把tmp与b的关联,继续转化为BHJ。相反,如果tmp很大,那么Spark SQL就用默认的SMJ。

    再回过头来说问题一,现有的AQE机制,是不会利用你说的那些信息的,不过这正是问题所在,就是AQE可不可以、应不应该利用其它的运行时信息,我个人认为,是可以的,只不过这里还是要平衡一个运行时信息统计效率的问题,这个是关键所在~

    关于问题二,你说的特性,应该是Spark的Dynamic allocation,也就是以Executors为粒度,根据任务的计算负载,动态缩放Spark的分布式集群。这个确实是个不错的思路,不过呢,如果仅仅是依赖于集群的缩放,那么其实还是解决不了以task为粒度做负载均衡的问题,因为不论你有多少Executors,倾斜的数据,还是会分发到某一个Executor上面去。这块的答案其实是“两阶段Shuffle”,第一阶段通过加盐, 把数据打散,这样数据就会均衡地分发到集群内所有的Executors,解决了Executors之间的负载均衡问题;第二阶段是去盐化,也就是把第一阶段添加的随机“盐粒”去掉,保证关联关系的一致性。关键两阶段Shuffle的细节,可以参考第29讲哈~

    2021-05-13 22:45:33

  • sparkjoy

    2021-08-26 21:51:12

    老师,cbo的信息是存在表的元信息里吗?
    作者回复

    对,CBO相关的元信息,都存储在Hive表,使用ANALYZE语句分析得到的统计信息,都会记录到那些数据表里面去~

    2021-08-27 13:39:31

  • A

    2021-07-21 14:50:06

    spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。分区合并的最终尺寸这个配置应该是个建议值吧?min(maxTargetSize,advisoryTargetSize)会通过这行代码来取advisoryTargetSize
    作者回复

    是的,没错,就是你说的这个逻辑~

    2021-07-28 20:41:08

  • wayne

    2021-12-20 20:49:29

    老师,请问 aqe可以强制指定输出文件的大小吗?比如强制设置 设置分区文件大小为128m
    作者回复

    不可以哈,AQE并不能强制每个分区大小是多少,只能是根据诸如:
    spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
    spark.sql.adaptive.advisoryPartitionSizeInBytes
    spark.sql.adaptive.coalescePartitions.minPartitionNum
    来决定什么时候拆分、什么时候合并

    再者,实际上强制分区大小,也是没有道理的,合理的做法,是根据“三足鼎立”的原则,灵活地设置CPU、内存和并行度,来决定每个分区的大小~

    2021-12-24 22:38:25

  • To_Drill

    2021-11-30 11:12:23

    老师您好,关于文章中的图-两边倾斜中的关联箭头指向有点疑问,如果两张表的关联分区都发生了倾斜,然后都进行了拆分,那么拆分之后的子分区中的是局部数据,应该和对方的全量数据都关联下才能保证原来的关联逻辑,这样的话各个子分区的关联箭头指向是不是应该如下图所示呢?
    拆分-01-拷贝 -> 拆分-01
    拆分-01 -> 拆分-02
    拆分-02 -> 拆分-01-拷贝
    拆分-02-拷贝 -> 拆分-02-拷贝
    作者回复

    对的,对的~ 老弟说的对~ 是应该像你说的那样,交叉组合,原图画的有问题,回头我改下,感谢老弟提醒~

    2021-11-30 23:39:53

  • wow_xiaodi

    2021-08-13 00:14:50

    老师,请教下:
    (1)尽管两个大分区被拆分,但横向来看,整个作业的主要负载还是落在了 Executor 0 的身上。Executor 0 的计算能力依然是整个作业的瓶颈,这一点并没有因为分区拆分而得到实质性的缓解。这里不是很明白为何分区拆分后都落到一个executor上呢?
    (2)aqe的分区拆分底层也是走两阶段shuffle聚合吗?
    作者回复

    好问题,这两个问题可以一起回答~

    首先,AQE的分区拆分,底层用的不是两阶段Shuffle。正因为如此,在Reduce阶段拆分的分区,还是落在同一个Executors内,否则的话,AQE就需要再一次做Shuffle。道理很简单,本来属于一个Executors的数据,被拆成多份,然后这些小份数据,如果要分发到别的Executors,本质上就是Shuffle。

    总结下来,AQE的拆分逻辑相对比较简单,主要是在Task粒度做拆分,并没有从Executors的粒度做拆分,因此拆分之后,Executors之间的负载问题,并没有得到本质的解决。

    再者,拆分不会引入额外的Shuffle,其实原因很简单,如果优化方法中又引入了Shuffle,那么其实优化方法本身是否真的有帮助,就非常值得探讨了。因为新的优化方法,在尝试解决原有问题的同时,又引入了新的问题:Shuffle的资源开销。

    2021-08-16 17:35:18

  • 西南偏北

    2021-05-14 19:48:09

    1. 如果不用Map阶段的输出文件的话,那应该就是实时统计了吧,比如"ANALYZE TABLE COMPUTE STATISTICS"
    2. 为了防止倾斜分区都出现在同一个Executor的情况,可以考虑对倾斜数据的key进行加前缀,然后再将这些数据进行一下重分区repartition(),分区数指定为executor的个数。但是,由于使用了repartition(),也就引入了shuffle开销,这个也是一个要平衡的问题
    作者回复

    第二题满分💯,没错,要实现Executors级别的均衡,那确实需要加盐来处理,也就是把Join Keys打散,这样哈希过后再分发数据,数据在Executors中的分布就会比较均衡。

    第一题实时统计的思路也没问题~ 这个思路其实就是传统DBMS的真正的CBO,实时计算每一步的costs,然后按照costs动态做优化。

    2021-05-19 14:53:26

  • Stony.修行僧

    2021-05-07 14:28:38

    老师请教一个问题,一个10g的csv 文件,里面有4个字段,其中三个字段需要做匿名化。在匿名化三个字段里过程中,可以partition第四个字段来提高性能吗?求老师的意见和建议
    作者回复

    我先说说对于问题的理解哈,不确定我的理解是不是对的。

    你的诉求:一个csv格式的文件,4个字段,目标是对其中3个字段做匿名化,想问按照第四个字段做分布式计算来提高执行效率行不行。

    如果是这样的话,那我觉得完全没问题,如果第四个字段分布比较均匀,不存在明显的倾斜问题,我觉得完全可以啊,而且就是应该这么做来提升执行效率。

    如果有倾斜的话,你还可以手工加盐处理,手工加盐的方法我们29讲会细说,到时候可以重点关注下~

    2021-05-07 22:26:51

  • guanyao

    2025-08-14 06:34:04

    老师,针对数据倾斜,aqe只能在task层面把大的task拆分成小的,它们一定依然在同一executor里面执行吗,我不理解这样有啥好处。比如,原来一个executor可以同时4个core,意思就是同时run4个partition,假设原来有10个partition分到这个executor,现在拆分之后变成了11个,那么只有很小的概率那两个大的task不会同时run,如果同时run,还是会oom之类的,不知道我的理解对不对。谢谢
  • really_z

    2024-10-12 22:12:10

    生产情况遇到这样的问题,有张表是标签表,大量的1和0 ,发现aqe自动优化成了broadcast join,就发生了oom。再隐式转化成sort merge join 就没有oom。想问一下aqe是怎么评估的呢,难道中间文件是会被压缩后评估的么
  • Geek1185

    2023-03-28 10:15:47

    老师这里该如何理解呢?“除此之外,我们还要注意,在 Shuffle Map 阶段完成之后,AQE 优化机制被触发,CoalesceShufflePartitions 策略“无条件”地被添加到新的物理计划中。”是指只要有shuffle,那么就会在exchange节点下增加customshuffleread coalesce节点吗
  • 你挺淘啊

    2022-07-17 18:25:28

    问下老师,自动分区合并那里的两个参数,spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。
    spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后,分区数不能低于该值。
    为什么有了推荐尺寸后,还要一个分区数的参数呢?这两个参数在实际中要如何一起使用呢?能举个例子说明吗,多谢老师
  • 我爱夜来香

    2022-05-01 15:47:44

    老师,有一点不大理解,一般都是把相同的key放到一个reducer执行,倾斜分区打散后最终还是会发到同一个reducer吗
  • CRT

    2022-01-18 13:24:50

    老师,既然分区合并是要等shuffle map阶段结束之后才可以适用,那么shuffle map阶段原本会生成多少个task,应该怎么决定呢?
    作者回复

    我们把shuffle之前的stage称作stage0,也就是shuffle map stage,把shuffle过后的stage,成为stage1,也就是reduce阶段的stage。
    Stage0的并行度,或者说有多少task,是由stage0里面第一个RDD/DataFrame决定的。而Reduce阶段的stage1,它的并行度,或者说tasks数量,它是由配置项spark.sql.shuffle.partitions决定的

    2022-01-21 23:27:49

  • Geek_c8148b

    2022-01-15 21:24:45

    想问一下文中提到“每个 data 文件的大小、空文件数量与占比、每个 Reduce Task 对应的分区大小,所有这些基于中间文件的统计值构成了 AQE 进行优化的信息来源”,这里为啥会出现“每个 Reduce Task 对应的分区大小” ?有点不理解~ 希望解答一下
    作者回复

    结合Shuffle中间文件,这些metrics都是可以计算出来的。比如说,对于某个reduce task,其实只要结合index文件中reduce task的索引,就能计算所有data文件中,对应index为止所有数据的大小,从而判定reduce task所需要处理的数据量~

    2022-01-21 23:17:50

  • 农夫三拳

    2021-11-05 09:19:21

    问题2: 可否采用自动分区合并的思路。 判断发送到某个executor的数据量过大。 预判会发生倾斜。 不是join情况下 :就让其他的executor拉取一部分数据做计算。 join情况下: 依然是一部分数据拉取到其他executor 同时关联的数据做复制 也拉取过去。 不知道是否可行?
    作者回复

    两个点需要考虑哈:
    1)首先如何在Executors粒度上判断倾斜,这个需要拿到除了Task以外,更多的信息,比如Task所属的Executors,这个信息肯定可以拿到,所以不是什么大问题
    2)不论是否有join,最终的聚合操作,都是要落到同一个Executor JVM进程,才能计算,所以老弟需要考虑这一个点~

    这块的答案其实是“两阶段Shuffle”,第一阶段通过加盐, 把数据打散,这样数据就会均衡地分发到集群内所有的Executors,解决了Executors之间的负载均衡问题;第二阶段是去盐化,也就是把第一阶段添加的随机“盐粒”去掉,保证关联关系的一致性。两阶段Shuffle的细节,可以参考第29讲哈~

    2021-11-05 15:28:50

  • sparkjoy

    2021-08-28 23:53:06

    老师,DemoteBroadcastHashJoin 策略在本文中的表格中说是对于逻辑计划部分的优化,我理解join策略应该属于物理计划,这个为什么是对逻辑计划的优化呢?
    作者回复

    DemoteBroadcastHashJoin属于优化规则,它的作用,是在运行时,调整逻辑计划(Logical Plan),所以属于逻辑计划那部分的优化。

    你说的对,绝大部分的Join策略相关的,其实都是在物理计划阶段才优化。不过DemoteBroadcastHashJoin这个规则,直接改写已有的逻辑计划,属于逻辑优化的范畴~

    2021-09-06 22:17:53