12 | 广播变量(一):克制Shuffle,如何一招制胜!

你好,我是吴磊。

在数据分析领域,数据关联(Joins)是Shuffle操作的高发区,二者如影随从。可以说,有Joins的地方,就有Shuffle。

我们说过,面对Shuffle,开发者应当“能省则省、能拖则拖”。我们已经讲过了怎么拖,拖指的就是,把应用中会引入Shuffle的操作尽可能地往后面的计算步骤去拖。那具体该怎么省呢?

在数据关联场景中,广播变量就可以轻而易举地省去Shuffle。所以今天这一讲,我们就先说一说广播变量的含义和作用,再说一说它是如何帮助开发者省去Shuffle操作的。

如何理解广播变量?

接下来,咱们借助一个小例子,来讲一讲广播变量的含义与作用。这个例子和Word Count有关,它可以说是分布式编程里的Hello world了,Word Count就是用来统计文件中全部单词的,你肯定已经非常熟悉了,所以,我们例子中的需求增加了一点难度,我们要对指定列表中给定的单词计数。

val dict = List(“spark”, “tune”)
val words = spark.sparkContext.textFile(“~/words.csv”)
val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

按照这个需求,同学小A实现了如上的代码,一共有4行,我们逐一来看。第1行在Driver端给定待查单词列表dict;第2行以textFile API读取分布式文件,内容包含一列,存储的是常见的单词;第3行用列表dict中的单词过滤分布式文件内容,只保留dict中给定的单词;第4行调用reduceByKey对单词进行累加计数。

学习过调度系统之后,我们知道,第一行代码定义的dict列表连带后面的3行代码会一同打包到Task里面去。这个时候,Task就像是一架架小飞机,携带着这些“行李”,飞往集群中不同的Executors。对于这些“行李”来说,代码的“负重”较轻,可以忽略不计,而数据的负重占了大头,成了最主要的负担。

你可能会说:“也还好吧,dict列表又不大,也没什么要紧的”。但是,如果我们假设这个例子中的并行度是10000,那么,Driver端需要通过网络分发总共10000份dict拷贝。这个时候,集群内所有的Executors需要消耗大量内存来存储这10000份的拷贝,对宝贵的网络和内存资源来说,这已经是一笔不小的浪费了。更何况,如果换做一个更大的数据结构,Task分发所引入的网络与内存开销会更可怕。

换句话说,统计计数的业务逻辑还没有开始执行,Spark就已经消耗了大量的网络和存储资源,这简直不可理喻。因此,我们需要对示例中的代码进行优化,从而跳出这样的窘境。

但是,在着手优化之前,我们不妨先来想一想,现有的问题是什么,我们要达到的目的是什么。结合刚刚的分析,我们不难发现,Word Count的核心痛点在于,数据结构的分发和存储受制于并行,并且是以Task为粒度的,因此往往频次过高。痛点明确了,调优的目的也就清晰了,我们需要降低数据结构分发的频次

要达到这个目的,我们首先想到的就是降低并行度。不过,牵一发而动全身,并行度一旦调整,其他与CPU、内存有关的配置项都要跟着适配,这难免把调优变复杂了。实际上,要降低数据结构的分发频次,我们还可以考虑广播变量。

广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发。换句话说,在广播变量的工作机制下,数据分发的频次等同于集群中的Executors个数。通常来说,集群中的Executors数量都远远小于Task数量,相差两到三个数量级是常有的事。那么,对于第一版的Word Count实现,如果我们使用广播变量的话,会有哪些变化呢?

代码的改动很简单,主要有两个改动:第一个改动是用broadcast封装dict列表,第二个改动是在访问dict列表的地方改用broadcast.value替代。

val dict = List(“spark”, “tune”)
val bc = spark.sparkContext.broadcast(dict)
val words = spark.sparkContext.textFile(“~/words.csv”)
val keywords = words.filter(word => bc.value.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

你可能会说:“这个改动看上去也没什么呀!”别着急,我们先来分析一下,改动之后的代码在运行时都有哪些变化。

在广播变量的运行机制下,封装成广播变量的数据,由Driver端以Executors为粒度分发,每一个Executors接收到广播变量之后,将其交给BlockManager管理。由于广播变量携带的数据已经通过专门的途径存储到BlockManager中,因此分发到Executors的Task不需要再携带同样的数据。

这个时候,你可以把广播变量想象成一架架专用货机,专门为Task这些小飞机运送“大件行李”。Driver与每一个Executors之间都开通一条这样的专用货机航线,统一运载负重较大的“数据行李”。有了专用货机来帮忙,Task小飞机只需要携带那些负重较轻的代码就好了。等这些Task小飞机在Executors着陆,它们就可以到Executors的公用仓库BlockManager里去提取它们的“大件行李”。

总之,在广播变量的机制下,dict列表数据需要分发和存储的次数锐减。我们假设集群中有20个Executors,不过任务并行度还是10000,那么,Driver需要通过网络分发的dict列表拷贝就会由原来的10000份减少到20份。同理,集群范围内所有Executors需要存储的dict拷贝,也由原来的10000份,减少至20份。这个时候,引入广播变量后的开销只是原来Task分发的1/500!

广播分布式数据集

那在刚刚的示例代码中,广播变量封装的是Driver端创建的普通变量:字符串列表。除此之外,广播变量也可以封装分布式数据集

我们来看这样一个例子。在电子商务领域中,开发者往往用事实表来存储交易类数据,用维度表来存储像物品、用户这样的描述性数据。事实表的特点是规模庞大,数据体量随着业务的发展不断地快速增长。维度表的规模要比事实表小很多,数据体量的变化也相对稳定。

假设用户维度数据以Parquet文件格式存储在HDFS文件系统中,业务部门需要我们读取用户数据并创建广播变量以备后用,我们该怎么做呢?很简单,几行代码就可以搞定!

val userFile: String = “hdfs://ip:port/rootDir/userData”
val df: DataFrame = spark.read.parquet(userFile)
val bc_df: Broadcast[DataFrame] = spark.sparkContext.broadcast(df)

首先,我们用Parquet API读取HDFS分布式数据文件生成DataFrame,然后用broadcast封装DataFrame。从代码上来看,这种实现方式和封装普通变量没有太大差别,它们都调用了broadcast API,只是传入的参数不同。

但如果不从开发的视角来看,转而去观察运行时广播变量的创建过程的话,我们就会发现,分布式数据集与普通变量之间的差异非常显著。

从普通变量创建广播变量,由于数据源就在Driver端,因此,只需要Driver把数据分发到各个Executors,再让Executors把数据缓存到BlockManager就好了。

但是,从分布式数据集创建广播变量就要复杂多了,具体的过程如下图所示。

与普通变量相比,分布式数据集的数据源不在Driver端,而是来自所有的Executors。Executors中的每个分布式任务负责生产全量数据集的一部分,也就是图中不同的数据分区。因此,步骤1就是Driver从所有的Executors拉取这些数据分区,然后在本地构建全量数据。步骤2与从普通变量创建广播变量的过程类似。 Driver把汇总好的全量数据分发给各个Executors,Executors将接收到的全量数据缓存到存储系统的BlockManager中

不难发现,相比从普通变量创建广播变量,从分布式数据集创建广播变量的网络开销更大。原因主要有二:一是,前者比后者多了一步网络通信;二是,前者的数据体量通常比后者大很多。

如何用广播变量克制Shuffle?

你可能会问:“Driver从Executors拉取DataFrame的数据分片,揉成一份全量数据,然后再广播出去,抛开网络开销不说,来来回回得费这么大劲,图啥呢?”这是一个好问题,因为以广播变量的形式缓存分布式数据集,正是克制Shuffle杀手锏。

Shuffle Joins

为什么这么说呢?我还是拿电子商务场景举例。有了用户的数据之后,为了分析不同用户的购物习惯,业务部门要求我们对交易表和用户表进行数据关联。这样的数据关联需求在数据分析领域还是相当普遍的。

val transactionsDF: DataFrame = _
val userDF: DataFrame = _
transactionsDF.join(userDF, Seq(“userID”), “inner”)

因为需求非常明确,同学小A立即调用Parquet数据源API,读取分布式文件,创建交易表和用户表的DataFrame,然后调用DataFrame的Join方法,以userID作为Join keys,用内关联(Inner Join)的方式完成了两表的数据关联。

在分布式环境中,交易表和用户表想要以userID为Join keys进行关联,就必须要确保一个前提:交易记录和与之对应的用户信息在同一个Executors内。也就是说,如果用户黄小乙的购物信息都存储在Executor 0,而个人属性信息缓存在Executor 2,那么,在分布式环境中,这两种信息必须要凑到同一个进程里才能实现关联计算。

在不进行任何调优的情况下,Spark默认采用Shuffle Join的方式来做到这一点。Shuffle Join的过程主要有两步。

第一步就是对参与关联的左右表分别进行Shuffle,Shuffle的分区规则是先对Join keys计算哈希值,再把哈希值对分区数取模。由于左右表的分区数是一致的,因此Shuffle过后,一定能够保证userID相同的交易记录和用户数据坐落在同一个Executors内。

Shuffle完成之后,第二步就是在同一个Executors内,Reduce task就可以对userID一致的记录进行关联操作。但是,由于交易表是事实表,数据体量异常庞大,对TB级别的数据进行Shuffle,想想都觉得可怕!因此,上面对两个DataFrame直接关联的代码,还有很大的调优空间。我们该怎么做呢?话句话说,对于分布式环境中的数据关联来说,要想确保交易记录和与之对应的用户信息在同一个Executors中,我们有没有其他办法呢?

克制Shuffle的方式

还记得之前业务部门要求我们把用户表封装为广播变量,以备后用吗?现在它终于派上用场了!

import org.apache.spark.sql.functions.broadcast
 
val transactionsDF: DataFrame = _
val userDF: DataFrame = _
 
val bcUserDF = broadcast(userDF)
transactionsDF.join(bcUserDF, Seq(“userID”), “inner”)

Driver从所有Executors收集userDF所属的所有数据分片,在本地汇总用户数据,然后给每一个Executors都发送一份全量数据的拷贝。既然每个Executors都有userDF的全量数据,这个时候,交易表的数据分区待在原地、保持不动,就可以轻松地关联到一致的用户数据。如此一来,我们不需要对数据体量巨大的交易表进行Shuffle,同样可以在分布式环境中,完成两张表的数据关联。

利用广播变量,我们成功地避免了海量数据在集群内的存储、分发,节省了原本由Shuffle引入的磁盘和网络开销,大幅提升运行时执行性能。当然,采用广播变量优化也是有成本的,毕竟广播变量的创建和分发,也是会带来网络开销的。但是,相比大表的全网分发,小表的网络开销几乎可以忽略不计。这种小投入、大产出,用极小的成本去博取高额的性能收益,真可以说是“四两拨千斤”!

小结

在数据关联场景中,广播变量是克制Shuffle的杀手锏。掌握了它,我们就能以极小的成本,获得高额的性能收益。关键是我们要掌握两种创建广播变量的方式。

第一种,从普通变量创建广播变量。在广播变量的运行机制下,普通变量存储的数据封装成广播变量,由Driver端以Executors为粒度进行分发,每一个Executors接收到广播变量之后,将其交由BlockManager管理。

第二种,从分布式数据集创建广播变量,这就要比第一种方式复杂一些了。第一步,Driver需要从所有的Executors拉取数据分片,然后在本地构建全量数据;第二步,Driver把汇总好的全量数据分发给各个Executors,Executors再将接收到的全量数据缓存到存储系统的BlockManager中。

结合这两种方式,我们在做数据关联的时候,把Shuffle Joins转换为Broadcast Joins,就可以用小表广播来代替大表的全网分发,真正做到克制Shuffle。

每日一练

  1. Spark广播机制现有的实现方式是存在隐患的,在数据量较大的情况下,Driver可能会成为瓶颈,你能想到更好的方式来重新实现Spark的广播机制吗?(提示:SPARK-17556
  2. 在什么情况下,不适合把Shuffle Joins转换为Broadcast Joins?

期待在留言区看到你的思考和答案,我们下一讲见!

精选留言

  • Sansi

    2021-04-09 09:44:02

    1. 改成由driver获取到数据分布,然后通知各个executor之间进行拉取,这样可以利用多个executor网络,避免只有driver组装以后再一个一个发送效率过低

    2.当两个需要join的数据集都很大时,使用broadcast join需要将一个很大的数据集进行网络分发多次,已经远超出了shuffle join需要传输的数据
    作者回复

    Perfect!满分💯,两道题答的都很好~

    2021-04-09 15:58:11

  • Geek_d794f8

    2021-05-12 22:45:03

    磊哥,为什么我测试了广播rdd不行:
    我写了个demo,广播rdd是报错的,代码如下:
    val userFile: String ="spark-basic/File/csv_data.csv"
    val df: DataFrame = spark.read.csv(userFile)
    val rdd = spark.sparkContext.textFile("userFile")
    val bc_df: Broadcast[RDD[String]] = spark.sparkContext.broadcast(rdd)
    bc_df.value.collect().foreach(println)

    报错如下:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.

    然后看了一下源码:SparkContext中的broadcast方法:

    def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
    "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
    }

    第4行的代码显示的Can not directly broadcast RDDs
    是不是我哪里不太对?
    作者回复

    你是对的,确实会报错。这里我没有交代清楚,RDD确实不能直接用广播变量封装,它不像DataFrame,DataFrame广播的那部分源码在内部把collect这个事做了,所以你可以直接用广播封装DataFrame,但是RDD没有,确实需要先手动collect RDD数据集,然后再在driver端用广播变量封装,我的锅,没有交代清楚~

    不过,歪打正着,通过这个例子,你可以更好地理解Driver在构建广播变量时的计算过程,也就是第一步都是把数据集collect到Driver端,不管是RDD、DataFrame、Dataset,区别无非是collect这件事是谁做的。RDD是开发者来做,而DataFrame、Dataset是Spark自己“偷偷”做了。

    2021-05-13 17:55:39

  • Jack

    2021-04-09 23:16:42

    老师,对于第1题,看了下spark的源码,目前Broadcast只有一个实现类TorrentBroadcast,看代码的注释,这个类通过使用类似Bit-torrent协议的方法解决了Driver成为瓶颈的问题。目前Spark还会存在广播变量的数据太大造成Driver成为瓶颈的问题吗?

    /**
    * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    *
    * The mechanism is as follows:
    *
    * The driver divides the serialized object into small chunks and
    * stores those chunks in the BlockManager of the driver.
    *
    * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    * other executors if available. Once it gets the chunks, it puts the chunks in its own
    * BlockManager, ready for other executors to fetch from.
    *
    * This prevents the driver from being the bottleneck in sending out multiple copies of the
    * broadcast data (one per executor).
    *
    * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
    *
    * @param obj object to broadcast
    * @param id A unique identifier for the broadcast variable.
    */
    private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
    extends Broadcast[T](id) with Logging with Serializable {
    作者回复

    非常赞哈👍哈~ 凡是看源码的同学,都先给个赞~

    这块非常值得探讨。我的理解是这样的,代码层面,spark确实已经有code在尝试用p2p的方式来分发广播变量,从而减轻driver负担。

    但是,据我观察,这部分代码尚未生效。细节可以参考这个ticket:【Executor side broadcast for broadcast joins】https://issues.apache.org/jira/browse/SPARK-17556,看上去还是进行中的状态。

    另外,从代码看,目前还是先用collect拉到driver,然后再分发出去:

    BroadcastExchangeExec中的relationFuture用于获取广播变量内容
    在relationFuture内部:
    1. 先是调用executeCollectIterator生成内容relation;
    其中,executeCollectIterator调用collect把结果集收集到driver端
    2. 然后用sparkContext.broadcast(relation),把生成好的内容广播到各个Executors
    并没有看到哪里从Executors拉取数据分片、来减轻driver负载。

    并且,这里还有提示driver内存不够的exception:
    new OutOfMemoryError("Not enough memory to build and broadcast the table to all " + "worker nodes. As a workaround, you can either disable broadcast by setting " + s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark " + s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value.").initCause(oe.getCause))

    你贴的这段代码确实在尝试用片2片,不过,需要仔细看看,它在哪里调用,被谁调用~

    2021-04-10 19:54:46

  • 冯杰

    2021-06-10 10:48:08

    老师你好,关于broacast join,遇到了一个特别的问题请教一下。
    1、Fact(订单) 和 DIM(门店) 关联。 其中门店表量级为 3w(条数) * 10个(字段),采用Parquet存储在Hive上,大小1M左右。
    2、运行参数,并行度 = 200,Executor = 50,CPU核数 = 2,内存/Executor = 6G,Drvier内存=2G。 PS:没有特别配置Broadcast 相关参数
    3、执行时,有两个疑问点,不得其解
    a)Spark UI 显示,并没有执行BHJ,反而执行了 hash sort merge join。 照理,如此小的数据,应该走前者
    b)Spark UI 显示,走hash sort merge join后,shuffle阶段的内存计算大小为4MB,sort阶段的内存计算大小为6G。 为何sort完后,为膨胀的如此厉害。

    作者回复

    先来说第一个问题~

    a)你说的没错,按理说,应该走广播,毕竟广播阈值默认10MB,你的数据集足够小,磁盘上1MB,内存里4MB,怎么看都比广播阈值小。我能想到的可能的原因,就是Spark SQL对于数据集大小的误判,就是它对于DIM的预估大于10MB。你不妨用下面这个方法,计算一下DIM在Spark SQL下的预判大小:

    val df: DataFrame = _
    df.cache.count

    val plan = df.queryExecution.logical
    val estimated: BigInt = spark
    .sessionState
    .executePlan(plan)
    .optimizedPlan
    .stats
    .sizeInBytes

    看看能不能发现什么端倪。虽然咱们不能完全确认原因,不过要解决这个问题倒是蛮简单的。一来可以用broadcast函数,二来可以用join hints,总之就是各种花式强制广播。

    b)这里就需要更多信息来判断了,6G是从哪里得到的?Spark UI的DAG图吗?sort阶段的6G,仅仅是DIM表的大小?可以加我微信“方块K”或是“rJunior”,把完整的DAG贴给我~

    2021-06-11 15:34:17

  • Geek_d794f8

    2021-04-10 17:20:52

    老师有两个问题请教一下:
    1.文中提到两个表join,两个表数据量相差很大呀,为什么他们的的分区数是一致的,而且分区数不是根据hadoop的切片规则去划分的吗?
    2.广播join不是默认开启的吗,好像小表默认10M;还需像文中代码val bcUserDF = broadcast(userDF)这样声明吗?
    希望得到您的指导,多谢!
    作者回复

    思考的很深入,👍赞一个~

    1. 我理解,你问的是这句吧?“由于左右表的分区数是一致的,因此 Shuffle 过后,一定能够保证 userID 相同的交易记录和用户数据坐落在同一个 Executors 内。” HadoopRDD的分区数、或者说并行度,确实是由HDFS文件系统决定的;但是,Shuffle过后,每个分布式数据集的并行度,就由参数spark.sql.shuffle.partitions来决定了,这个咱们在配置项哪一讲说过哟~ 因此,如果你没有手工用repartition或是Coalesce去调整并行度,默认情况下,大家Shuffle过后(在Reduce阶段)都是这个并行度。

    2. 默认确实是开启的,默认值确实也是10MB,但是,这个10MB太太太太太太(太 x N)小了!很多小表其实都超过了这个阈值,因此,如果你懒得去调整这个参数,可以直接用broadcast(userDF)这种强制广播的方式,省时省力,比较方便~

    2021-04-12 17:41:09

  • 斯盖丸

    2021-05-05 11:23:04

    老师我生产中为啥从没有遇到过10000并行度那么大的stage,可能我公司比较小吧,集群最多才100多个核,多数时才几百个任务,最多时也才2000多个任务。这健康吗?
    作者回复

    这里其实有两个容易混淆的概念哈~

    一个是并行度,并行度其实是从数据角度出发,表示的是你的分布式数据集划分的粒度,再直白点说,它和分区数是等效的。因此,它其实跟你集群有多少Executors,每个Executors配置了多少cores,没有关系~

    第二个是并发度,或者叫Executors线程池大小,也就是你用spark.executor.cores类似的参数,给Executors指定的cores资源。它限制了在同一时间,你的Executors中最多同时能跑多少个任务。Executors并发度乘以集群中的Executors数量,其实就是你集群的并发处理能力,很多地方也叫并行处理能力。其实蛋疼的地方在于,不同的作者、不同的上下文,并发和并行这两个词,总是混用。所以也就造成大家都比较困惑。

    咱们在配置项第一讲,其实就在尝试厘清、约定这两个词的定义,一来方便大家理解,二来方便后续讨论。

    所以,回答你的问题,其实没什么不健康的哈~ 10000并行度,意味着10000个分区的分布式数据集,这个应该不难见到。另外100个cores的集群,其实也不算小了~ 不过你说的2000任务我没有get到,不知道是2000并行度,还是2000的集群并发。如果是2000集群并发的话,这个数和100cores对不上。这意味着你的每个core需要20个超线程,哈哈,目前还没有这么给力的CPU。一般CPU也就2个超线程。

    2021-05-05 23:09:56

  • 斯盖丸

    2021-04-09 20:00:37

    原来小表和大表join是节省了大表的shuffle,不然大表只能根据join的列在所有机器上重新分布一遍,现在懂了
    作者回复

    是的,以小博大

    2021-04-10 19:59:52

  • 狗哭

    2021-10-21 19:59:58

    select * from
    (select id from table1) a -- 结果很大
    left join
    (select id from table2) b -- 结果很小
    on t1.id = t2.id;
    老师请教下,这种情况b表会广播吗?如果不会怎么处理能让其广播出去呢
    作者回复

    能否广播,取决于b表的存储大小,是否小于广播阈值,也就是:spark.sql.autoBroadcastJoinThreshold。如果小于这个阈值,就会广播,否则就不会。

    如果懒得设阈值,还可以利用 API 强制广播,这里的具体细节,可以参考第13讲哈,就是后面的一讲~ 会详细说,怎么把Shuffle Join,转化为Broadcast Join

    2021-10-22 16:07:02

  • 一只菜🐶

    2024-07-14 12:44:43

    老师,想问下生产中会碰到广播变量超过阈值然后导致driver oom但实际被广播的表很小 比如1mb driver内存一般设置在5g-10g之间 解决方案就是将spark.sql.autoBroadcastJoinThreshold=-1就可以避免oom 不太清楚原因 麻烦老师解答
  • 光羽隼

    2024-06-21 16:08:37

    如果通过Driver获取小表的数据分布信息,然后选取其中一个Executor收集所有的数据,然后再通过这个Executor将数据分发到所有的Executor。这样网络开销会小很多吧。。。。
    这种让其中一个Executor收集分发的方式和Driver收集再分发的方式没区别,只是Driver收集小表全部数据可能会有内存的压力,但是Executor最后反正是需要小表全部的数据,那么提前让Executor来承担这个收集的工作,应该不会造成内存压力。
  • 有风的夏天

    2022-11-01 02:26:14

    请问下,executor数 还有核数,与并行度设置的关系,您在实际应用中是怎么设置的
  • 陌生的心酸

    2022-02-21 23:31:30

    1>.当数据量比较大,对数据进行广播后,同时还要接受各个Executor的中间结果上报,状态管理,导致网络繁忙,继而会发生分发任务到Executor产生失败

    2> 两个大表【超过广播变量的阈值参数设置】进行join,数据需要分发多次,效率不佳
    作者回复

    可以参考Bennan同学的答案哈:

    1. P2P思路:改成由driver获取到数据分布,然后通知各个executor之间进行拉取,这样可以利用多个executor网络,避免只有driver组装以后再一个一个发送效率过低

    2.当两个需要join的数据集都很大时,使用broadcast join需要将一个很大的数据集进行网络分发多次,已经远超出了shuffle join需要传输的数据

    2022-02-26 23:36:23

  • 子兮

    2021-11-10 16:43:45

    老师您好,在整个应用资源较为紧张,数据量较大的情况下:spark core计算过程中,生成一个较大的RDD , 它被引用一次,但我还是对它persist(disk),我在用完并且动作算子后,立刻对它进行了释放unpersist,这样操作是否能加快spark 对这个rdd 的清理,加快内存的释放,缓解内存压力?如果是persist(memory and disk),用完并且在动作算子后立即释放unpersist,是否能缓解内存压力?如果不persist,用完并且在动作算子后立即释放unpersist,是否能缓解内存压力? 文字有些长,希望没给老师造成困扰,谢谢老师
    作者回复

    关于Cache(Persist)的部分,建议老弟关注第16讲:内存视角(二):如何有效避免Cache滥用?这一讲,比较系统、细致地介绍了Cache的使用场景、原则,和一般注意事项,尤其是什么时候该Cache,什么时候不能滥用Cache,老弟可以先看看哈~

    2021-11-10 22:58:48

  • 子兮

    2021-11-10 16:20:29

    1 老师,您好,spark core计算过程中,需要频繁的使用broadCast 操作, 这样累计几次后driver 端内存会很有压力,怎样设置参数或者手动清除之前用来broadCast 的数据?谢谢老师
    2 老师这个课程的微信交流群怎么添加呢?期待加入学习

    作者回复

    先说问题2)老弟可以加我微信,搜索“方块K”,或是“rJunior”,我把你拉进去

    再来说1)从两个角度来说:

    首先, driver创建完广播变量、分发给Executors之后,广播变量在Driver端其实是没有引用的,所以随着时间流逝,这些在Driver端创建的广播变量,会慢慢地被GC掉,所以说,不存在长期积压导致内存不断消耗的问题~

    再一个,即便广播变量没有被GC,那么出现你说的那种情况,前提也是你在应用中创建了很多的广播变量。一般来说,广播变量确实可以改善Shuffle join,但是,滥用也不行,适得其反。就是说,我们不能可着一只羊薅羊毛,一个应用中,如果广播变量非常得多,尤其是大数据集上的广播变量,那我们应该考虑,是否应用的实现方式是合理的,有没有其他途径,实现同样的业务逻辑

    2021-11-10 22:55:07

  • 子兮

    2021-11-02 23:41:56

    老师, 课程里您讲了内连接左右左右两表的连接过程,您能否再讲一下左外连接leftoutjoin 的呢?对shuffle过程不太清晰,谢谢老师
    作者回复

    其实跟inner相比,left只是在形式上不同哈,实际在运行时的执行过程,都是一样的,所以凡是适合inner的优化方法,left、right、semi、anti,都适用的~

    2021-11-05 14:39:39

  • zhongmin

    2021-09-06 14:27:00

    吴老师,问个问题,在广播分布式变量的时候,如果变量的内容发生改变,是怎么去做变量的同步和更新呢?
    作者回复

    一般来说,广播变量封装的,都是immutable data,就是不可变数据(集)。如果封装的变量值有变化,那么就需要创建新的广播变量,之前创建的广播变量所携带的数据内容,是不会自动跟着更新的~

    2021-09-06 22:10:14

  • wow_xiaodi

    2021-08-01 22:06:53

    老师对于这节有一些问题:
    (1)executor对于待回传driver端的广播数据集是先存在内存还是落地硬盘呢?
    (2)由executor来处理分片再回传,可能分片需要进行一定的计算再由driver汇总广播,那么如果是无需计算的原始分片呢,driver可否亲自操刀读取所有原始分片直接汇总再分发呢,感觉这样可以节省网络开销?
    作者回复

    好问题~

    1)优先放内存,空间不足放磁盘

    2)一来实际上并没有节省网络开销(不妨仔细想想为什么),再者这样的应用场景太有限了。这种case只适用于那种数据刚读进来就广播,场景太有限。

    2021-08-04 09:56:34

  • xuchuan

    2021-05-18 09:08:46

    2.多个大表join应该就没法用广播解决,这个延伸一个问题,以电商为例,即席查询应该还要有数仓吧,这不是spark的主要覆盖范畴。
    作者回复

    这块其实涉及到Join的执行过程,不论查询语句中本身有多少个Join,在运行时,传统的DBMS也好,Spark SQL也好,实际上都是两张表、两张表地做Join。比如说,有个查询,a join b,b又join c,在运行时,a join b会生成临时结果,比如叫xxx,那么后续与c做关联的,不再是b,而是这个xxx。因此,与c的关联是否能做Broadcast Join的转化,取决于xxx的中间大小,而不是b的大小。

    数仓这个是个好问题,通常来说,Spark需要额外的第三方组件来管理数仓元数据,因此,可以考虑用Hive构建数仓的存储层,而用Spark做计算层。

    2021-05-19 16:46:25

  • Geek_d794f8

    2021-05-11 20:31:09

    磊哥,为什么dataframe可以广播,而rdd不能广播。dataframe也不存数据呀,它只是比rdd 多了schema信息。这块不太理解。
    作者回复

    两者都可以广播哈~ 他们都是分布式数据集,你说的对,DataFrame只是多了个Schema而已,分布式数据集都是可以广播的~

    文中举例多用DataFrame,RDD用得少,原因在于DataFrame API目前是主流,它能享受到Spark SQL带来的种种优化机制。

    但像广播这种优化机制,二者都可以的。

    2021-05-12 13:36:49

  • 弦断觅知音

    2021-05-07 10:58:03

    请教老师第一个问题: 用什么方式 改为由driver获取到数据分布,然后通知各个executor之间进行拉取?
    作者回复

    这里我原文没有说清楚,是我的锅哈。这里广播变量的创建和使用,其实分两个步骤:

    1. 广播变量创建:第一个是Driver从所有Executors收集数据,构建广播变量,也就是全量数据,这个过程Driver是瓶颈;

    2. 广播变量使用/读取:第二个是各个Executors再去拉取广播变量,这个过程现在只有TorrentBroadcast这一种实现方式。这个时候,Executors之间用类似“P2P”的方式去拉取广播变量的每一个block。优先从本机的其他Executors拉取、其次本Rack的其他机器去拉取,这里有点类似任务本地性,总之对于target block,给定block地址列表,BlockManager会优先选取拉取成本最低的那个去尝试。之所有说是类似“P2P”,原因在于它拉取的形式是P2P,但是这里并没有什么P2P协议,而是利用BlockManager来实现定向拉取。BlockManager从BlockManagerMaster获取目标block地址列表,然后按照刚刚说的顺序,依次尝试拉取Block。

    2021-05-12 13:33:54