05 | 调度系统:“数据不动代码动”到底是什么意思?

你好,我是吴磊。

在日常的开发与调优工作中,为了充分利用硬件资源,我们往往需要手工调节任务并行度来提升CPU利用率,控制任务并行度的参数是Spark的配置项:spark.default.parallelism。增加并行度确实能够充分利用闲置的CPU线程,但是,parallelism数值也不宜过大,过大反而会引入过多的调度开销,得不偿失。

这个调优技巧可以说是老生常谈了,网上到处都可以搜得到。那你知道为什么parallelism数值过大调度开销会呈指数级增长吗?调度开销具体又是指什么呢?以及,如果不想一个数值一个数值的尝试,parallelism数值究竟该怎么设置,才能以最少的时间获得最好的效果?如果你还没有答案,或者说还没有把握答对,接下来你就要好好听我讲。

这一讲,我会通过一个机器学习案例,来和你一起聊聊调度系统是什么,它是怎么工作的,从而帮助你摆脱调优总是停留在知其然、不知其所以然的尴尬境地。

案例:对用户兴趣特征做Label Encoding

在机器学习应用中,特征工程几乎占据了算法同学80%的时间和精力,毕竟,一份质量优良的训练样本限定了模型效果的上限和天花板,我们要讲的案例就来自特征工程中一个典型的处理场景:Label Encoding(标签编码)。

什么是Label encoding呢?模型特征按照是否连续可以分为两类:连续性数值特征和离散型特征,离散型特征往往以字符串的形式存在,比如用户兴趣特征就包括体育、政治、军事和娱乐等。对于很多机器学习算法来说,字符串类型的数据是不能直接消费的,需要转换为数值才行,例如把体育、政治、军事、娱乐映射为0、1、2、3,这个过程在机器学习领域有个术语就叫Label encoding。

我们这一讲的案例,就是要对用户兴趣特征做Label encoding,简单来说就是以固定的模板把字符串转换为数值,然后将千亿条样本中的用户兴趣转换为对应的索引值。固定模板是离线模型训练与线上模型服务之间的文件接口,内容仅包含用户兴趣这一列,字符串已按事先约定好的规则进行排序。我们需要注意的是,用户兴趣包含4个层级,因此这个模板文件较大,记录数达到万级别。

//模板文件
//用户兴趣
体育-篮球-NBA-湖人
军事-武器-步枪-AK47

那具体怎么转换呢?例如,我们可以将用户兴趣“体育-篮球-NBA-湖人”映射为0,将兴趣“军事-武器-步枪-AK47”映射为1,以此类推。应该说,需求还是相当明确的,我身边的同学们拿到需求之后,奔儿都没打,以迅雷不及掩耳之势就实现了如下的处理函数。

/**
实现方式1
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
 
//函数定义
def findIndex(templatePath: String, interest: String): Int = {
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
searchMap.getOrElse(interest, -1)
}
 
//Dataset中的函数调用
findIndex(filePath, "体育-篮球-NBA-湖人")

我们可以看到这个函数有两个形参,一个是模板文件路径,另一个是训练样本中的用户兴趣。处理函数首先读取模板文件,然后根据文件中排序的字符串构建一个从兴趣到索引的Map映射,最后在这个Map中查找第二个形参传入的用户兴趣,如果能找到则返回对应的索引,找不到的话则返回-1。

这段代码看上去似乎没什么问题,同学们基于上面的函数对千亿样本做Label encoding,在20台机型为C5.4xlarge AWS EC2的分布式集群中花费了5个小时。坦白说,这样的执行性能,我是不能接受的。你可能会说:“需求就是这个样子,还能有什么别的办法呢?”我们不妨来看另外一种实现方式。

/**
实现方式2
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
 
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
 
//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")

同学们基于第二种方式对相同的数据集做Label encoding之后,在10台同样机型的分布式集群中花了不到20分钟就把任务跑完了。可以说,执行性能的提升是显而易见的。那么,两份代码有什么区别呢?

我们可以看到,相比于第一份代码,第二份代码的函数体内没有任何变化,还是先读取模板文件、构建Map映射、查找用户兴趣,最后返回索引。最大的区别就是第二份代码对高阶函数的使用,具体来说有2点:

  1. 处理函数定义为高阶函数,形参是模板文件路径,返回结果是从用户兴趣到索引的函数;
  2. 封装千亿样本的Dataset所调用的函数,不是第一份代码中的findIndex,而是用模板文件调用findIndex得到的partFunc,partFunc是形参为兴趣、结果为索引的普通标量函数。

那么,高阶函数真有这么神奇吗?其实,性能的提升并不是高阶函数的功劳,而是调度系统在起作用。

Spark的调度系统是如何工作的?

Spark调度系统的核心职责是,先将用户构建的DAG转化为分布式任务,结合分布式集群资源的可用性,基于调度规则依序把分布式任务分发到执行器。这个过程听上去就够复杂的了,为了方便你理解,我们还是先来讲一个小故事。

土豆工坊流水线升级

在学完了内存计算的第二层含义之后,土豆工坊的老板决定对土豆加工流水线做升级,来提高工坊的生产效率和灵活性。

这里,我们先对内存计算的第二层含义做个简单地回顾,它指的是同一Stage中的所有操作会被捏合为一个函数,这个函数一次性会被地应用到输入数据上,并且一次性地产生计算结果

升级之前的土豆加工流程DAG被切分为3个执行阶段Stage,它们分别是Stage 0、Stage 1、Stage 2。其中,Stage 0产出即食薯片,Stage 1分发调味品,Stage 2则产出不同尺寸、不同风味的薯片。我们重点关注Stage 0,Stage 0有3个加工环节,分别是清洗、切片和烘焙。这3个环节需要3种不同的设备,即清洗机、切片机和烤箱。

工坊有3条流水线,每种设备都需要3套,在成本方面要花不少钱呢,因此工坊老板一直绞尽脑汁想把设备方面的成本降下来。

此时,工头儿建议:“老板,我听说市场上有一种可编程的土豆加工设备,它是个黑盒子并且只有输入口和输出口,从外面看不见里面的操作流程。不过黑盒子受程序控制,给定输入口的食材,我们可以编写程序控制黑盒子的输出。有了这个可编程设备,咱们不但省了钱,将来还可以灵活地扩充产品线。比方想生产各种风味的薯条或是土豆泥,只需要更换一份程序加载到黑盒子里就行啦!”

老板听后大喜,决定花钱购入可编程土豆加工设备,替换并淘汰现有的清洗机、切片机和烤箱。

于是,工坊的加工流水线就变成了如下的样子。工人们的工作也从按照DAG流程图的关键步骤,在流水线上安装相应的设备,变成了把关键步骤编写相应的程序加载到黑盒内。这样一来,这家工坊的生产力也从作坊式的生产方式,升级到了现代化流水线的作业模式。

那么,这个故事跟我们今天要讲的调度系统有什么关系呢?事实上,Spark调度系统的工作流程包含如下5个步骤:

1. 将DAG拆分为不同的运行阶段Stages;
2. 创建分布式任务Tasks和任务组TaskSet;
3. 获取集群内可用硬件资源情况;
4. 按照调度规则决定优先调度哪些任务/组;
5. 依序将分布式任务分发到执行器Executor。

除了第4步以外,其他几步和土豆工坊流水线上的关键步骤都是一一对应的,它们的对应关系如下:

现在,你可能会觉得用故事来记这几个步骤好像多此一举,但当我们学完了所有的原理之后,再回过头来把故事的主线串联起来,你就会惊喜地发现,所有的原理你都能轻松地记住和理解,这可比死记硬背的效率要高得多。

调度系统中的核心组件有哪些?

接下来,我们深入到流程中的每一步去探究Spark调度系统是如何工作的。不过在此之前,我们得先弄清楚调度系统都包含哪些关键组件,不同组件之间如何交互,它们分别担任了什么角色,才能更好地理解流程中的每一步。

Spark调度系统包含3个核心组件,分别是DAGScheduler、TaskScheduler和SchedulerBackend。这3个组件都运行在Driver进程中,它们通力合作将用户构建的DAG转化为分布式任务,再把这些任务分发给集群中的Executors去执行。不过,它们的名字都包含Scheduler,光看名字还真是丈二和尚摸不着头脑,所以我把它们和调度系统流程中5个步骤的对应关系总结在了下表中,你可以看一看。

1. DAGScheduler

DAGScheduler的主要职责有二:一是把用户DAG拆分为Stages,如果你不记得这个过程可以回顾一下上一讲的内容;二是在Stage内创建计算任务Tasks,这些任务囊括了用户通过组合不同算子实现的数据转换逻辑。然后,执行器Executors接收到Tasks,会将其中封装的计算函数应用于分布式数据分片,去执行分布式的计算过程。

不过,如果我们给集群中处于繁忙或者是饱和状态的Executors分发了任务,执行效果会大打折扣。因此,在分发任务之前,调度系统得先判断哪些节点的计算资源空闲,然后再把任务分发过去。那么,调度系统是怎么判断节点是否空闲的呢?

2. SchedulerBackend

SchedulerBackend就是用来干这个事的,它是对于资源调度器的封装与抽象,为了支持多样的资源调度模式如Standalone、YARN和Mesos,SchedulerBackend提供了对应的实现类。在运行时,Spark根据用户提供的MasterURL,来决定实例化哪种实现类的对象。MasterURL就是你通过各种方式指定的资源管理器,如--master spark://ip:host(Standalone 模式)、--master yarn(YARN 模式)。

对于集群中可用的计算资源,SchedulerBackend会用一个叫做ExecutorDataMap的数据结构,来记录每一个计算节点中Executors的资源状态。ExecutorDataMap是一种HashMap,它的Key是标记Executor的字符串,Value是一种叫做ExecutorData的数据结构,ExecutorData用于封装Executor的资源状态,如RPC地址、主机地址、可用CPU核数和满配CPU核数等等,它相当于是对Executor做的“资源画像”。

总的来说,对内,SchedulerBackend用ExecutorData对Executor进行资源画像;对外,SchedulerBackend以WorkerOffer为粒度提供计算资源,WorkerOffer封装了Executor ID、主机地址和CPU核数,用来表示一份可用于调度任务的空闲资源。显然,基于Executor资源画像,SchedulerBackend可以同时提供多个WorkerOffer用于分布式任务调度。WorkerOffer这个名字起得蛮有意思,Offer的字面意思是公司给你提供的工作机会,结合Spark调度系统的上下文,就变成了使用硬件资源的机会。

好了,到此为止,要调度的计算任务有了,就是DAGScheduler通过Stages创建的Tasks;可用于调度任务的计算资源也有了,即SchedulerBackend提供的一个又一个WorkerOffer。如果从供需的角度看待任务调度,DAGScheduler就是需求端,SchedulerBackend就是供给端。

3. TaskScheduler

左边有需求,右边有供给,如果把Spark调度系统看作是一个交易市场的话,那么中间还需要有个中介来帮它们对接意愿、撮合交易,从而最大限度地提升资源配置的效率。在Spark调度系统中,这个中介就是TaskScheduler。TaskScheduler的职责是,基于既定的规则与策略达成供需双方的匹配与撮合

显然,TaskScheduler的核心是任务调度的规则和策略,TaskScheduler的调度策略分为两个层次,一个是不同Stages之间的调度优先级,一个是Stages内不同任务之间的调度优先级

首先,对于两个或多个Stages,如果它们彼此之间不存在依赖关系、互相独立,在面对同一份可用计算资源的时候,它们之间就会存在竞争关系。这个时候,先调度谁、或者说谁优先享受这份计算资源,大家就得基于既定的规则和协议照章办事了。

对于这种Stages之间的任务调度,TaskScheduler提供了2种调度模式,分别是FIFO(先到先得)和FAIR(公平调度)。 FIFO非常好理解,在这种模式下,Stages按照被创建的时间顺序来依次消费可用计算资源。这就好比在二手房交易市场中,两个人同时看中一套房子,不管两个人各自愿意出多少钱,谁最先交定金,中介就优先给谁和卖家撮合交易。

你可能会说:“这不合常理啊!如果第二个人愿意出更多的钱,卖家自然更乐意和他成交。”没错,考虑到开发者的意愿度,TaskScheduler提供了FAIR公平调度模式。在这种模式下,哪个Stages优先被调度,取决于用户在配置文件fairscheduler.xml中的定义。

在配置文件中,Spark允许用户定义不同的调度池,每个调度池可以指定不同的调度优先级,用户在开发过程中可以关联不同作业与调度池的对应关系,这样不同Stages的调度就直接和开发者的意愿挂钩,也就能享受不同的优先级待遇。对应到二手房交易的例子中,如果第二个人乐意付30%的高溢价,中介自然乐意优先撮合他与卖家的交易。

说完了不同Stages之间的调度优先级,我们再来说说同一个Stages内部不同任务之间的调度优先级,Stages内部的任务调度相对来说简单得多。当TaskScheduler接收到来自SchedulerBackend的WorkerOffer后,TaskScheduler会优先挑选那些满足本地性级别要求的任务进行分发。众所周知,本地性级别有4种:Process local < Node local < Rack local < Any。从左到右分别是进程本地性、节点本地性、机架本地性和跨机架本地性。从左到右,计算任务访问所需数据的效率越来越差。

进程本地性表示计算任务所需的输入数据就在某一个Executor进程内,因此把这样的计算任务调度到目标进程内最划算。同理,如果数据源还未加载到Executor进程,而是存储在某一计算节点的磁盘中,那么把任务调度到目标节点上去,也是一个不错的选择。再次,如果我们无法确定输入源在哪台机器,但可以肯定它一定在某个机架上,本地性级别就会退化到Rack local。

DAGScheduler划分Stages、创建分布式任务的过程中,会为每一个任务指定本地性级别,本地性级别中会记录该任务有意向的计算节点地址,甚至是Executor进程ID。换句话说,任务自带调度意愿,它通过本地性级别告诉TaskScheduler自己更乐意被调度到哪里去

既然计算任务的个人意愿这么强烈,TaskScheduler作为中间商,肯定要优先满足人家的意愿。这就像一名码农想要租西二旗的房子,但是房产中介App推送的结果都是东三环国贸的房子,那么这个中介的匹配算法肯定有问题。

由此可见,Spark调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。毕竟,分发代码的开销要比分发数据的代价低太多,这也正是“数据不动代码动”这个说法的由来。

总的来说,TaskScheduler根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化。然后,交给SchedulerBackend,SchedulerBackend根据ExecutorData中记录的RPC地址和主机地址,再将序列化的任务通过网络分发到目的主机的Executor中去。最后,Executor接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算。

性能调优案例回顾

知道了调度系统是如何工作的,我们就可以回过头来说说开头Label encoding的开发案例中,2种实现方式的差别到底在哪儿了。我们先来回顾案例中处理函数的主要计算步骤:

  1. 读取并遍历模板文件内容,建立从字符串到数值的字典;
  2. 根据样本中的用户兴趣,查找字典并返回兴趣字符串对应的数值索引。

2种实现方式的本质区别在于,函数中2个计算步骤的分布式计算过程不同。在第1种实现方式中,函数是一个接收两个形参的普通标量函数,Dataset调用这个函数在千亿级样本上做Label encoding。

在Spark任务调度流程中,该函数在Driver端交由DAGScheduler打包为Tasks,经过TaskScheduler调度给SchedulerBackend,最后由SchedulerBackend分发到集群中的Executors中去执行。这意味着集群中的每一个Executors都需要执行函数中封装的两个计算步骤,要知道,第一个步骤中遍历文件内容并建立字典的计算开销还是相当大的。

反观第2种实现方式,2个计算步骤被封装到一个高阶函数中。用户代码先在Driver端用模板文件调用这个高阶函数,完成第一步计算建立字典的过程,同时输出一个只带一个形参的标量函数,这个标量函数内携带了刚刚建好的映射字典。最后,Dataset将这个标量函数作用于千亿样本之上做Label encoding。

发现区别了吗?在第2种实现中,函数的第一步计算只在Driver端计算一次,分发给集群中所有Executors的任务中封装的是携带了字典的标量函数。然后在Executors端,Executors在各自的数据分片上调用该函数,省去了扫描模板文件、建立字典的开销。最后,我们只需要把样本中的用户兴趣传递进去,函数就能以O(1)的查询效率返回数值结果。

对于一个有着成百上千Executors的分布式集群来说,这2种不同的实现方式带来的性能差异还是相当可观的。因此,如果你能把Spark调度系统的工作原理牢记于心,我相信在代码开发或是review的过程中,你都能够意识到第一个计算步骤会带来的性能问题。这种开发过程中的反思,其实就是在潜移默化地建立以性能为导向的开发习惯

小结

今天这一讲,我们先通过一个机器学的案例对比了2种实现方式的性能差异,知道了对于调度系统一知半解,很有可能在开发过程中引入潜在的性能隐患。为此,我梳理了调度系统工作流程的5个主要步骤:

  1. 将DAG拆分为不同的运行阶段Stages;
  2. 创建分布式任务Tasks和任务组TaskSet;
  3. 获取集群内可用硬件资源情况;
  4. 按照调度规则决定优先调度哪些任务/组;
  5. 依序将分布式任务分发到执行器Executor;

结合这5个步骤,我们深入分析了Spark调度系统的工作原理,我们可以从核心职责和核心原则这两方面来归纳:

  1. Spark调度系统的核心职责是,先将用户构建的DAG转化为分布式任务,结合分布式集群资源的可用性,基于调度规则依序把分布式任务分发到执行器Executors;
  2. Spark调度系统的核心原则是,尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方(Executors或计算节点),从而最大限度地降低分布式系统中的网络开销。

每日一练

  1. DAGScheduler在创建Tasks的过程中,是如何设置每一个任务的本地性级别?
  2. 在计算与存储分离的云计算环境中,Node local本地性级别成立吗?你认为哪些情况下成立?哪些情况下不成立?

期待在留言区看到你的思考和答案,如果你的朋友也正急需搞清楚调度系统的工作原理,也欢迎你把这一讲转发给他,我们下一讲见!

精选留言

  • 慢慢卢

    2021-06-19 06:44:07

    任务调度的时候不考虑可用内存大小吗
    作者回复

    好问题~ 我们分资源调度和任务调度两种情况来说。

    Spark在做任务调度之前,SchedulerBackend封装的调度器,比如Yarn、Mesos、Standalone,实际上已经完成了资源调度,换句话说,整个集群有多少个containers/executors,已经是一件确定的事情了。而且,每个Executors的CPU和内存,也都是确定的了(因为你启动Spark集群的时候,使用配置项指定了每个Executors的CPU和内存分别是多少)。资源调度器在做资源调度的时候,确实是同时需要CPU和内存信息的。

    资源调度完成后,Spark开始任务调度,你的问题,其实是任务调度范畴的问题。也就是TaskScheduler在准备调度任务的时候,要事先知道都有哪些Executors可用,注意,是可用。也就是TaskScheduler的核心目的,在于获取“可用”的Executors。

    现在来回答你的问题,也就是:为什么ExecutorData不存储于内存相关的信息。答案是:不需要。一来,TaskScheduler要达到目的,它只需知道Executors是否有空闲CPU、有几个空闲CPU就可以了,有这些信息就足以让他决定是否把tasks调度到目标Executors上去。二来,每个Executors的内存总大小,在Spark集群启动的时候就确定了,因此,ExecutorData自然是没必要记录像Total Memory这样的冗余信息。

    再来说Free Memory,首先,我们说过,Spark对于内存的预估不准,再者,每个Executors的可用内存都会随着GC的执行而动态变化,因此,ExecutorData记录的Free Memory,永远都是过时的信息,TaskScheduler拿到这样的信息,也没啥用。一者是不准,二来确实没用,因为TaskScheduler拿不到数据分片大小这样的信息,TaskScheduler在Driver端,而数据分片是在目标Executors,所以TaskScheduler拿到Free Memory也没啥用,因为它也不能判断说:task要处理的数据分片,是不是超过了目标Executors的可用内存。

    综上,ExecutorData的数据结构中,只保存了CPU信息,而没有记录内存消耗等信息。不知道这些能不能解答你的问题?有问题再聊哈~

    2021-06-21 18:28:37

  • Geek_d794f8

    2021-03-25 21:00:21

    老师,数据尽量不动,比如有部分数据在节点A,那么移动计算难道不是要在节点A上启动Excutor才可以进行计算吗?但是Excutor不是在申请资源的时候就确定了在哪几个节点上启动Excutor吗?老师请指教一下
    作者回复

    好问题,是这样的,资源调度和任务调度是分开的。

    资源调度主要看哪些节点可以启动executors,是否能满足executors所需的cpu数量要求,这个时候,不会考虑任务、数据本地性这些因素。

    资源调度完成之后,在任务调度阶段,spark负责计算每个任务的本地性,效果就是task明确知道自己应该调度到哪个节点,甚至是哪个executors。最后scheduler Backend会把task代码,分发到目标节点的目标executors,完成任务调度,实现数据不动代码动。

    所以,二者是独立的,不能混为一谈哈~

    2021-03-26 09:26:45

  • L3nvy

    2021-03-24 16:34:37

    1.
    位置信息通过特定的字符串前缀格式标识
    executor_[hostname]_[executorid]
    [hostname]
    hdfs_cache_[hostname]

    DAGScheduler会尝试获取RDD的每个Partition的偏好位置信息,a.如果RDD被缓存,通过缓存的位置信息获取每个分区的位置信息;b.如果RDD有preferredLocations属性,通过preferredLocations获取每个分区的位置信息;c. 遍历RDD的所有是NarrowDependency的父RDD,找到第一个满足a,b条件的位置信息

    DAGScheduler将生成好的TaskSet提交给TaskSetManager进行任务的本地性级别计算

    2.
    感觉像是Spark on Kubernetes这种场景
    应该和相关存储配置有关;不太了解,猜想的话。如果是配置的Spark中间过程使用的存储是分布式存储,Node Local应该不成立;如果就是单个容器的内部空间,或者挂载到主机上的空间,应该可以成立
    作者回复

    第一题给满分💯,看答案就知道认真去读源码了,赞一个~ 第二题也答对了。第二题,再想想,还有其他cases吗?

    2021-03-24 18:59:26

  • 斯盖丸

    2021-04-18 18:38:15

    老师,我这是二刷您的课程了,但我想说课程的例子没看懂。第二种用部分函数的例子里,是节约了哪步操作呢?读文件应该只要Driver读一次就够了。但是zipWithIndex生成的map呢,由于没有把它广播出去,那应该还是每个task都会被拷贝一份全量的map吧。我这样的理解对吗?如果是对的,那感觉性能提升也不应该那么明显吧…
    作者回复

    非常好,思考的很深入~ 你说的没错,由于map没有用广播,所以每个task都会携带这个map,有额外的网络和内存存储开销。

    但是,你要看跟谁比,跟广播比,第二种写法确实不如广播,但如果你跟第一种实现比,性能提升会非常明显。在第一种实现下,task不会携带map,而是在Executor临时去读文件、临时创建那个map,这个重复的计算开销,远大于task分发携带map带来的网络和内存开销。

    简言之, 你说的非常对,不过咱们这一讲要强调的关键是调度系统,因此后来并没有用广播进一步优化,讲道理来说,一定是广播的实现方式是最优的。

    2021-04-18 23:20:59

  • 小学生敬亭山

    2021-03-26 18:35:09

    老师正例这个,先建map,再broadcast map 是不是一样的逻辑
    作者回复

    好思路,完全没问题~

    2021-03-27 09:31:52

  • wow_xiaodi

    2021-07-20 15:54:29

    老师,请问对于第一种函数的写法和调用,为何是每个executor只处理一次,而不是对RDD里的每一条数据都去运行一遍函数,然后都加载一次map呢?请问这个函数在spark内核里如何解析和运作的呢,他如何知道里面有个map只去初始化一次,而不是每条数据都运行一次呢?
    作者回复

    非常好的问题~ 你说的是对的,这里是我没有说清楚。不管是实现方式1、还是实现方式2,都是RDD里的每一条数据都去运行一次函数。原文中“这意味着集群中的每一个 Executors 都需要执行函数中封装的两个计算步骤”这个表述,是不准确的。准确的说法,应该是像你说的,每条数据,都需要执行这两个计算步骤。感谢老弟的提醒和纠正~ 你说的是对的,就按你自己的思路来理解就好~

    另外,要想让每个Executor只处理一次,那么咱们就只能依赖广播变量了,只有广播变量才能做到这一点。

    2021-07-28 22:19:35

  • Z宇锤锤

    2021-04-11 22:23:34

    /**
    * Create a TaskLocation from a string returned by getPreferredLocations.
    * These strings have the form executor_[hostname]_[executorid], [hostname], or
    * hdfs_cache_[hostname], depending on whether the location is cached.
    */ 终于找到了榜一所说的location信息
    作者回复

    666,不容易,赞坚持不懈~ 👍

    2021-04-12 17:42:11

  • 来世愿做友人 A

    2021-03-24 23:41:46

    第一题:因为是为每个 partition 建立一个task,所以在建立task之前,都会获取每个partition的位置偏好信息。首先判断 rdd 是否被缓存过,通过 rddId + splitIndex 组合成 blockId 判断。如果没有,判断preferredLocations,看起来是判断是否 checkpoint 过。如果还没有,向上获取父rdd,如果是窄依赖,循环上面的判断逻辑。这里想问个问题,代码里直到task分发,似乎没有看到关于shuffle的位置偏好。比如中间有个shuffle过程,shuffle结果写在磁盘小文件,是不是下个 stage 的 task 应该发到父 stage 的所在 executor 更合适?目前没看到这个逻辑,想问问老师
    作者回复

    如果你说的是shuffle read阶段的locality,咱们换个角度思考这个问题。shuffle map阶段,每个map task把中间文件写到本地盘。shuffle read阶段,每个reduce task需要从集群的所有节点拉数据,走网络。这个过程是由shuffle的实现机制决定的。

    因此,从reduce视角看过去,所有中间文件都是它的“数据源”,这些数据源,散落在集群的每一个节点,因此,每个reduce task的locality,在最好的情况下,能做到rack local,最差的时候,那就是any。

    正是因为这样,所以我们会强调,竭尽全力避免shuffle。因为它的实现机制,决定了reduce task的计算一定会让数据走网络。

    不知道这么说能不能回答你的问题哈~ 要是我理解错了,就再at我,咱们继续讨论哈~

    2021-03-25 19:53:50

  • 张笑笑

    2021-09-11 17:23:03

    吴老师,您给的这个案例中,第二个实现方式上使用了高阶函数,看了几次,确实还是没明白,为什么使用这种写法,它只在driver端做一次计算?为什么就省去了读取文件,创建字典的开小了,迷惑中...
    作者回复

    其实不用特别在意高阶函数,它的核心作用,其实是创建出一个包含了查询字典的函数对象。真正分发到Executors的代码,是这个包含了字典的函数对象。

    而创建字典的过程,只在Driver端做一次,因为传递给高阶函数第一个参数、生成“带查询字典的函数对象”的动作,只做了一次,所以创建字典的开销,只有在Driver的那一次。

    2021-09-12 23:38:44

  • Fendora范东_

    2021-04-04 10:04:54

    关于任务调度:
    默认情况下,会先调度process local那批tasks;然后依次是node,rack,any。

    在调度了最契合locality的tasks后还有空闲executor。下一批task本来是有资源可用的,但最适合执行task的executor已被占用,此时会评估下一批tasks等待时间和在空闲executor执行数据传输时间,如果等待时间大于数据传输则直接调度到空闲executor,否则继续等待。

    把wait参数设置为0,则可以不进行等待,有资源时直接调度执行

    这块逻辑一直有点乱。磊哥看下哪有问题嘛?
    作者回复

    没问题,就是locality wait,就是有些task是有调度倾向的,preferredLocations。但是,它想要去的executors,可能正在忙,没有空闲cpu。这个时候两个选择,要么,等executors忙完;要么放弃,调度到其他节点或是executors,退而求其次。locality wait默认3s,但是可以调。不过一般3s就行。除非有些io密集型,必须要node local,这个时候,可以适当调大,多等等。

    其实就是平衡,等待时间和执行时间的平衡,看你具体场景。

    2021-04-04 14:08:29

  • 白音

    2021-03-24 08:32:38

    示例中关于读文件没太理解想请教下老师。
    Source.fromFile 用于读本地文件,所以用spark读文件不是应该用 sc.textFile 来从hdfs目录读取? 或者示例的意思是在跑这段代码之前已经将模板文件分发到了集群每个executor本地吗?
    作者回复

    对,反例的情况,需要把模板文件分发到每个executors,每个executors都需要:1 读文件,建字典;2 在千亿样本上查找字典。

    正例里面,driver读文件、建字典;executors上面,只做第二步。

    2021-03-24 09:43:21

  • October

    2021-03-24 22:03:51

    老师,您提到如果taskScheduler采用Fair调度策略对不同stages进行调度,可以为不同的用户配置不同的调度池,刚开始这个地方有些不理解,同一个应用程序中,怎么会有不同的用户? 于是查了一下官网,官网貌似说的是这里的用户不是程序提交的那个用户,这里的用户对应提交job的一个线程,不知道自己理解的是否正确。 另外,如果我的理解正确的话,在同一个应用程序中使用不同的线程提交job,这个使用方式,我目前还没有见过,请问老师,大概什么场景下,会用不同线程提交job?
    作者回复

    非常好的问题!非常好!通过问题就能看出来,这一篇读的非常认真,赞一个先~ 是这样,你查的没错,首先用户的说法确实容易造成困惑,其实就是应用中多个不同的job,换句话说,就是你应用中的多个actions。开发的时候,你可以把job assign到某个调度池,从而区分不同job在资源争抢上的优先级。

    回答你最后的问题,换一种问法,其实就是什么场景下,一个应用会有多个job。其实这种cases很多,我最先想到的,就是etl,因为etl往往有多个目的,读取多个数据源,输出多种不同阶段的数据,这是常有的事,因此etl至少是一个。

    另外一个,机器学习中的样本过程,通过一个应用生产多个schema的训练样本,这也会涉及到多个job。

    你可以沿着这个思路,再想想还有没有其他cases哈~

    2021-03-25 20:05:55

  • 站在桥上看风景

    2021-08-25 14:34:41

    吴老师,FIFO与FAIR应该是在使用standalone时的情况是吧,如果使用yarn的话资源调度就是FIFO、Capacity Scheduler、Fair Scheduler这三个的选择了是吧
    作者回复

    是的,没错~

    FIFO、FAIR是Standalone的两种调度模式;
    YARN的话,选择更多,就是你说的这几个~

    我这里没有交代清楚,谢谢提醒哈~

    2021-08-27 13:44:40

  • 🚤

    2021-03-30 11:44:41

    如果是我来写Label Encoding的话,在模板数据量不大的情况下,我会第一时间把模板数据转成Map之后广播出去。
    看了老师的正例,感觉其实和广播的意思是差不多的吧
    作者回复

    没错,和广播殊途同归~ 这里咱们为了强调调度系统,所以采用了这种实现方式哈~

    2021-03-30 19:14:13

  • 小学生敬亭山

    2021-03-26 18:11:26

    老师好,为什么第二个示例(也就是正例),建hashmap这个代码就一定是在driver端执行,然后代码再进入调度系统我不是太明白?
    什么时候代码会在driver端执行?
    作者回复

    这一步比较重,从数据处理的最终目的来说,这一步算是开销,放在driver端算一次是最佳策略,避免这个开销在executors断反复计算。

    没有被apply到分布式数据集的计算,都是在driver执行。比如常见的数组、list、字符串,等等。rdd. dataframe. 这些分布式数据集上apply的计算逻辑,才会分发到executors去执行。

    2021-03-27 09:46:04

  • Geek_eba94c

    2022-01-09 18:55:57

    请问文中第二段代码:“//函数定义
    val findIndex: (String) => (String) => Int = ”中,为什么要写两次“(String) => ”?公司刚上产品线,临时学的Scala,求大佬解答,谢谢!
    作者回复

    好问题,Scala的语法,确实比较迷~
    val findIndex: (String) => (String) => Int =
    在这段代码里面,“(String) => (String) => Int”是findIndex这个变量的类型,它的含义是:
    1)首先这个类型,指的是一个函数
    2)函数的输入参数,是一个String的变量,也就是第一个(String)
    3)而函数的输出,是另外一个函数,也就是“(String) => Int”,这个就好理解一些,这个函数输入参数是String的变量,输出是Int的变量

    所以说,findIndex是一个高阶函数,他的输入是String,而输出是一个函数。

    2022-01-13 18:18:54

  • 阳台

    2021-10-28 10:44:53

    老师,spark可以并行提交job吗?比如,现在一个应用有4个job,后三个依赖第一个job的结果,能不能让第一个job结束后,后三个job并行提交到集群里面执行。而不是顺序执行每一个job?
    作者回复

    有的,那就得启用FAIR调度机制,FAIR机制的细节,咱们课程里没有展开,老弟可以参考Spark官网FAIR机制给出的例子就行。

    大概步骤是:
    1)定义资源队列,给不同队列设置权重和优先级
    2)在应用的不同Job中,指定哪些Job属于哪些队列

    然后就可以了,Spark在运行时,会选择不同的资源队列,并行地执行可以并行执行(不存在依赖关系)的Job。

    2021-10-28 20:49:40

  • 陈子

    2021-09-25 23:59:34

    老师,我是 Spark 新手,有个问题请教。对于在 HDFS 上的数据,是由多个 block 组成的,这些 block 及其副本散落在多个节点上,那么 SchedulerBackend 又是如何知道将哪个 Task 调度到哪个节点上来实现 Process Local的呢?对于其他非 HDFS 分布式存储上面的数据,Spark 也可以做到移动计算吗?
    作者回复

    好问题,如果是hdfs,spark通过namenode获取元数据。其他的分布式存储,基本都做不到“代码动”。因为存储和计算是分离的,比如s3

    2021-09-26 19:41:48

  • 薛峰

    2021-09-06 05:34:54

    很有启发,谢谢磊哥,我也想问一下如果用python的话也需要同样的操作么?
    比如
    dic_file=/path/to/dic_file
    def func_lower(dic_file, keyword):
    load dic_file,
    find keyword
    return index


    def func_higher(keyword):
    return func_lower(dic_file,keyword)
    作者回复

    对的,python用闭包也可以做到这一点~ 除了用闭包函数之外,还可以考虑广播变量~

    2021-09-06 22:33:42

  • 果子

    2021-08-08 22:02:00

    吴老师,有个不太理解的地方,stage之间的调度是根据Yarn中的FIFO或FAIRE调度器来调度的,但是stage之间的调度本身就是有顺序的啊,比如ResultStage要依赖于ShuffMapStage,只有ShuffMapStage执行完了才能执行ResultStage,他们两者之间本身就存在依赖关系,串行执行的,为什么还需要调度器呢指定调度规则呢,谁先调度不是本身就规定好的吗?这块我是哪里理解错了吗?
    作者回复

    你的理解没有任何问题,在大多数的Spark应用中,在计算流程上,我们的业务逻辑往往是串行的DAG,这个时候,大部分情况下,Stages之间都是前后串行的依赖关系。这个时候,是FIFO还是FAIR,没有任何区别。

    FIFO、FAIR的主要作用,是给那些有多个Job的应用准备的。对于一些较为复杂、或者说逻辑上比较绕的应用来说,同一个应用,会有多个不同的、地位平等的Job。换句话说,一个DAG上,有多个并列、平行的计算分支。在这种情况下,平行分支上的Stages之间,是不存在依赖关系的,所谓的FIFO、FAIR,是这个时候在起作用。也就是说,是先来的分支先serve,还是每个分支都有公平的机会获取到计算资源,主要是这么个区别。

    那么问题来,平时开发的时候,有哪些场景会需要这种多个Job的应用呢。我能想到的,机器学习的样本工程是一个场景。

    在做特征工程的时候,我们往往需要尝试不同的方法来生成训练样本,比如,组合特征,或是生成特征,等等。对于不同的特征工程,我们想要看他们各自的效果,就得生成多样的训练样本。这个时候,同一份数据,就可能有多个处理分支,每个分支最终生成一份训练样本,用于后续的模型训练。这是一个场景。

    再者,在数仓场景下,同一份源数据,多个不同用户反复查询,就会触发多个Actions/Jobs,这个时候,也涉及到先serve谁、后serve谁的问题,是先到先得,还是FAIR。

    等等,诸如此类,你不妨想想,日常的开发中还有没有其他类似的场景。

    2021-08-13 14:36:04