04 | 进程模型与分布式部署:分布式计算是怎么回事?

你好,我是吴磊。

第2讲的最后,我们留了一道思考题。Word Count的计算流图与土豆工坊的流水线工艺,二者之间有哪些区别和联系?如果你有点记不清了,可以看下后面的图回忆一下。

图片

图片

我们先来说区别。首先,Word Count计算流图是一种抽象的流程图,而土豆工坊的流水线是可操作、可运行而又具体的执行步骤。然后,计算流图中的每一个元素,如lineRDD、wordRDD,都是“虚”的数据集抽象,而流水线上各个环节不同形态的食材,比如一颗颗脏兮兮的土豆,都是“实实在在”的实物。

厘清了二者之间的区别之后,它们之间的联系自然也就显而易见了。如果把计算流图看作是“设计图纸”,那么流水线工艺其实就是“施工过程”。前者是设计层面、高屋建瓴的指导意见,而后者是执行层面、按部就班的实施过程。前者是后者的基石,而后者是前者的具化。

你可能会好奇:“我们为什么非要弄清这二者之间的区别和联系呢?”原因其实很简单,分布式计算的精髓,在于如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。

今天这一讲,我们就来聊一聊,Spark是如何实现分布式计算的。分布式计算的实现,离不开两个关键要素,一个是进程模型,另一个是分布式的环境部署。接下来,我们先去探讨Spark的进程模型,然后再来介绍Spark都有哪些分布式部署方式。

进程模型

在Spark的应用开发中,任何一个应用程序的入口,都是带有SparkSession的main函数。SparkSession包罗万象,它在提供Spark运行时上下文的同时(如调度系统、存储系统、内存管理、RPC通信),也可以为开发者提供创建、转换、计算分布式数据集(如RDD)的开发API。

不过,在Spark分布式计算环境中,有且仅有一个JVM进程运行这样的main函数,这个特殊的JVM进程,在Spark中有个专门的术语,叫作“Driver”。

Driver最核心的作用在于,解析用户代码、构建计算流图,然后将计算流图转化为分布式任务,并把任务分发给集群中的执行进程交付运行。换句话说,Driver的角色是拆解任务、派活儿,而真正干活儿的“苦力”,是执行进程。在Spark的分布式环境中,这样的执行进程可以有一个或是多个,它们也有专门的术语,叫作“Executor”。

我把DriverExecutor的关系画成了一张图,你可以看看:

图片

分布式计算的核心是任务调度,而分布式任务的调度与执行,仰仗的是Driver与Executors之间的通力合作。在后续的课程中,我们会深入讲解Driver如何与众多Executors协作完成任务调度,不过在此之前,咱们先要厘清Driver与Executors的关系,从而为后续的课程打下坚实的基础。

Driver与Executors:包工头与施工工人

简单来看,Driver与Executors的关系,就像是工地上包工头与施工工人们之间的关系。包工头负责“揽活儿”,拿到设计图纸之后负责拆解任务,把二维平面图,细化成夯土、打地基、砌墙、浇筑钢筋混凝土等任务,然后再把任务派发给手下的工人。工人们认领到任务之后,相对独立地去完成各自的任务,仅在必要的时候进行沟通与协调。

其实不同的建筑任务之间,往往是存在依赖关系的,比如,砌墙一定是在地基打成之后才能施工,同理,浇筑钢筋混凝土也一定要等到砖墙砌成之后才能进行。因此,Driver这个“包工头”的重要职责之一,就是合理有序地拆解并安排建筑任务。

再者,为了保证施工进度,Driver除了分发任务之外,还需要定期与每个Executor进行沟通,及时获取他们的工作进展,从而协调整体的执行进度。

一个篱笆三个桩,一个好汉三个帮。要履行上述一系列的职责,Driver自然需要一些给力的帮手才行。在Spark的Driver进程中,DAGScheduler、TaskScheduler和SchedulerBackend这三个对象通力合作,依次完成分布式任务调度的3个核心步骤,也就是:

1.根据用户代码构建计算流图;
2.根据计算流图拆解出分布式任务;
3.将分布式任务分发到Executors中去。

接收到任务之后,Executors调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。对于一个完整的RDD,每个Executors负责处理这个RDD的一个数据分片子集。这就好比是,对于工地上所有的砖头,甲、乙、丙三个工人分别认领其中的三分之一,然后拿来分别构筑东、西、北三面高墙。

好啦,到目前为止,关于Driver和Executors的概念,他们各自的职责以及相互之间的关系,我们有了最基本的了解。尽管对于一些关键对象,如上述DAGScheduler、TaskScheduler,我们还有待深入,但这并不影响咱们居高临下地去理解Spark进程模型。

不过,你可能会说:“一说到模型就总觉得抽象,能不能结合示例来具体说明呢?”接下来,我们还是沿用前两讲展示的Word Count示例,一起去探究spark-shell在幕后是如何运行的。

spark-shell执行过程解析

在第1讲,我们在本机搭建了Spark本地运行环境,并通过在终端敲入spark-shell进入交互式REPL。与很多其他系统命令一样,spark-shell有很多命令行参数,其中最为重要的有两类:一类是用于指定部署模式的master,另一类则用于指定集群的计算资源容量。

不带任何参数的spark-shell命令,实际上等同于下方这个命令:

spark-shell --master local[*]

这行代码的含义有两层。第一层含义是部署模式,其中local关键字表示部署模式为Local,也就是本地部署;第二层含义是部署规模,也就是方括号里面的数字,它表示的是在本地部署中需要启动多少个Executors,星号则意味着这个数量与机器中可用CPU的个数相一致。

也就是说,假设你的笔记本电脑有4个CPU,那么当你在命令行敲入spark-shell的时候,Spark会在后台启动1个Driver进程和3个Executors进程。

那么问题来了,当我们把Word Count的示例代码依次敲入到spark-shell中,Driver进程和3个Executors进程之间是如何通力合作来执行分布式任务的呢?

为了帮你理解这个过程,我特意画了一张图,你可以先看一下整体的执行过程:

图片

首先,Driver通过take这个Action算子,来触发执行先前构建好的计算流图。沿着计算流图的执行方向,也就是图中从上到下的方向,Driver以Shuffle为边界创建、分发分布式任务。

Shuffle的本意是扑克牌中的“洗牌”,在大数据领域的引申义,表示的是集群范围内跨进程、跨节点的数据交换。我们在专栏后续的内容中会对Shuffle做专门的讲解,这里我们不妨先用Word Count的例子,来简单地对Shuffle进行理解。

在reduceByKey算子之前,同一个单词,比如“spark”,可能散落在不用的Executors进程,比如图中的Executor-0、Executor-1和Executor-2。换句话说,这些Executors处理的数据分片中,都包含单词“spark”。

那么,要完成对“spark”的计数,我们需要把所有“spark”分发到同一个Executor进程,才能完成计算。而这个把原本散落在不同Executors的单词,分发到同一个Executor的过程,就是Shuffle。

大概理解了Shuffle后,我们回过头接着说Driver是怎么创建分布式任务的。对于reduceByKey之前的所有操作,也就是textFile、flatMap、filter、map等,Driver会把它们“捏合”成一份任务,然后一次性地把这份任务打包、分发给每一个Executors。

三个Executors接收到任务之后,先是对任务进行解析,把任务拆解成textFile、flatMap、filter、map这4个步骤,然后分别对自己负责的数据分片进行处理。

为了方便说明,我们不妨假设并行度为3,也就是原始数据文件wikiOfSpark.txt被切割成了3份,这样每个Executors刚好处理其中的一份。数据处理完毕之后,分片内容就从原来的RDD[String]转换成了包含键值对的RDD[(String, Int)],其中每个单词的计数都置位1。此时Executors会及时地向Driver汇报自己的工作进展,从而方便Driver来统一协调大家下一步的工作。

这个时候,要继续进行后面的聚合计算,也就是计数操作,就必须进行刚刚说的Shuffle操作。在不同Executors完成单词的数据交换之后,Driver继续创建并分发下一个阶段的任务,也就是按照单词做分组计数。

数据交换之后,所有相同的单词都分发到了相同的Executors上去,这个时候,各个Executors拿到reduceByKey的任务,只需要各自独立地去完成统计计数即可。完成计数之后,Executors会把最终的计算结果统一返回给Driver。

这样一来,spark-shell便完成了Word Count用户代码的计算过程。经过了刚才的分析,对于Spark进程模型、Driver与Executors之间的关联与联系,想必你就有了更清晰的理解和把握。

不过,到目前为止,对于Word Count示例和spark-shell的讲解,我们一直是在本地部署的环境中做展示。我们知道,Spark真正的威力,其实在于分布式集群中的并行计算。只有充分利用集群中每个节点的计算资源,才能充分发挥出Spark的性能优势。因此,我们很有必要去学习并了解Spark的分布式部署。

分布式环境部署

Spark支持多种分布式部署模式,如Standalone、YARN、Mesos、Kubernetes。其中Standalone是Spark内置的资源调度器,而YARN、Mesos、Kubernetes是独立的第三方资源调度与服务编排框架。

由于后三者提供独立而又完备的资源调度能力,对于这些框架来说,Spark仅仅是其支持的众多计算引擎中的一种。Spark在这些独立框架上的分布式部署步骤较少,流程比较简单,我们开发者只需下载并解压Spark安装包,然后适当调整Spark配置文件、以及修改环境变量就行了。

因此,对于YARN、Mesos、Kubernetes这三种部署模式,我们不做详细展开,我把它给你留作课后作业进行探索。今天这一讲,我们仅专注于Spark在Standalone模式下的分布式部署。

为了示意Standalone模式的部署过程,我这边在AWS环境中创建并启动了3台EC2计算节点,操作系统为Linux/CentOS。

需要指出的是,Spark分布式计算环境的部署,对于节点类型与操作系统本身是没有要求和限制的,但是在实际的部署中,请你尽量保持每台计算节点的操作系统是一致的,从而避免不必要的麻烦

接下来,我就带你手把手地去完成Standalone模式的分布式部署。

Standalone在资源调度层面,采用了一主多从的主从架构,把计算节点的角色分为Master和Worker。其中,Master有且只有一个,而Worker可以有一到多个。所有Worker节点周期性地向Master汇报本节点可用资源状态,Master负责汇总、变更、管理集群中的可用资源,并对Spark应用程序中Driver的资源请求作出响应。

为了方便描述,我们把3台EC2的hostname分别设置为node0、node1、node2,并把node0选做Master节点,而把node1、node2选做Worker节点。

首先,为了实现3台机器之间的无缝通信,我们先来在3台节点之间配置无密码的SSH环境:

图片

接下来,我们在所有节点上准备Java环境并安装Spark(其中步骤2的“sudo wget”你可以参考这里的链接),操作命令如下表所示:

图片

在所有节点之上完成Spark的安装之后,我们就可以依次启动Master和Worker节点了,如下表所示:

图片

集群启动之后,我们可以使用Spark自带的小例子,来验证Standalone分布式部署是否成功。首先,打开Master或是Worker的命令行终端,然后敲入下面这个命令:

MASTER=spark://node0:7077 $SPARK_HOME/bin/run-example org.apache.spark.examples.SparkPi

如果程序能够成功计算Pi值,也就是3.14,如下图所示,那么说明咱们的Spark分布式计算集群已然就绪。你可以对照文稿里的截图,验证下你的环境是否也成功了。

图片

重点回顾

今天这一讲,我们提到,分布式计算的精髓在于,如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。而要想透彻理解分布式计算,你就需要掌握Spark进程模型。

进程模型的核心是Driver和Executors,我们需要重点理解它们之间的协作关系。任何一个Spark应用程序的入口,都是带有SparkSession的main函数,而在Spark的分布式计算环境中,运行这样main函数的JVM进程有且仅有一个,它被称为 “Driver”。

Driver最核心的作用,在于解析用户代码、构建计算流图,然后将计算流图转化为分布式任务,并把任务分发给集群中的Executors交付执行。接收到任务之后,Executors调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。

对于一个完整的RDD,每个Executors负责处理这个RDD的一个数据分片子集。每当任务执行完毕,Executors都会及时地与Driver进行通信、汇报任务状态。Driver在获取到Executors的执行进度之后,结合计算流图的任务拆解,依次有序地将下一阶段的任务再次分发给Executors付诸执行,直至整个计算流图执行完毕。

之后,我们介绍了Spark支持的分布式部署模式,主要有Standalone、YARN、Mesos、Kubernetes。其中,Standalone是Spark内置的资源调度器,而YARN、Mesos、Kubernetes是独立的第三方资源调度与服务编排框架。在这些部署模式中,你需要重点掌握Standalone环境部署的操作步骤。

每课一练

好,在这一讲的最后,我给你留两道作业,帮助你巩固今日所学。

1.与take算子类似,collect算子用于收集计算结果,结合Spark进程模型,你能说一说,相比collect算子相比take算子来说都有哪些隐患吗?

2.如果你的生产环境中使用了YARN、Mesos或是Kubernetes,你不妨说一说,要完成Spark在这些独立框架下的分布式部署,都需要哪些必备的步骤?

今天这一讲就到这里了,如果你在部署过程中遇到的什么问题,欢迎你在评论区提问。如果你觉得这一讲帮助到了你,也欢迎你分享给更多的朋友和同事,我们下一讲再见。

精选留言

  • Geek_2dfa9a

    2021-09-17 17:14:39

    1.
    collect,触发action返回全部组分区里的数据,results是个数组类型的数组,最后在把这些数组合并。
    因为所有数据都会被加载到driver,所以建议只在数据量少的时候使用。源码如下:
    /**
    * Return an array that contains all of the elements in this RDD.
    *
    * @note This method should only be used if the resulting array is expected to be small, as
    * all the data is loaded into the driver's memory.
    */
    def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
    }
    take代码太长就不发了,注释上说是先取一个分区,然后根据取到的数量预估出还需要取多少个分区。
    和collect一样,建议只在数据量少的时候使用。如果rdd里有Nothing和Null的话,会抛出异常。
    具体实现:
    先尝试从第一个分区0开始collet数据,
    如果返回数量为0的话,每次都拉已扫描分区数4倍的分区数(可以通过spark.rdd.limit.scaleUpFactor参数设置,默认值为4),
    如果返回数量大于0但是还不够需要take的数量的话,从已扫描分区数4倍的分区数和已扫描分区数预估一个需要扫描分区数(1.5*剩余需要take的数据数*已扫描分区数/已取到的数据数,然后向上取整)选一个最小值
    一直到拿到take数据数/全部分区都取完。
    /**
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
    * the limit.
    *
    * @note This method should only be used if the resulting array is expected to be small, as
    * all the data is loaded into the driver's memory.
    *
    * @note Due to complications in the internal implementation, this method will raise
    * an exception if called on an RDD of `Nothing` or `Null`.
    */

    2.使用了yarn,没有使用standalone所以也没啥生产经验,简单看了下官方部署文档:
    首先需要关注安全问题,SparkRpc的身份验证和加密,本地文件(shuffle)的加密,SparkUI的安全(身份验证,https,其他web安全)
    Event Logging和访问,客户端模式下的持久化日志的权限
    然后是高可用,spark的worker是支持高可用的,master是通过zk实现的
    作者回复

    满分!满分!💯,Perfect!

    太赞了,目前最棒的答案!

    collect和take的分析,尤其到位,尤其是take的部分!我跟编辑打个招呼,把你的答案置顶~

    2021-09-19 19:27:30

  • 路人丁

    2021-11-08 17:41:14

    老师好!讲解很精彩!
    为了帮助大家理解,还是要说说 standalone 模式下的 主从选举过程,三个节点怎么互相找到并选出主从。另外,standalone 模式下的 master 和 worker,与前面进程模型里说的 Driver 和 executor,二组之间的对应关系,也要讲讲。只要能简单串起来就可以了。让大家获得一个即便简单、但却完成的理解模型。
    作者回复

    感谢老弟,问题提得很好~

    先说说选主,这个其实比较简单,Standalone部署模式下,Master与Worker角色,这个是我们通过配置文件,事先配置好的,所以说,哪台是Master,哪台是Worker,这个配置文件里面都有。在Standalone部署下,先启动Master,然后启动Worker,由于配置中有Master的连接地址,所以Worker启动的时候,会自动去连接Master,然后双方建立心跳机制,随后集群进入ready状态。

    接下来说Master、Worker与Driver、Executors的关系。首先,这4个“家伙”,都是JVM进程。不过呢,他们的定位和角色,是完全不一样的。Master、Worker用来做资源的调度与分配,你可以这样理解,这两个家伙,只负责维护集群中可用硬件资源的状态。换句话说,Worker记录着每个计算节点可用CPU cores、可用内存,等等。而Master从Worker收集并汇总所有集群中节点的可用计算资源。

    Driver和Executors的角色,那就纯是Spark应用级别的进程了。这个咱们课程有介绍,就不赘述了。Driver、Executors的计算资源,全部来自于Master的调度。一般来说,Driver会占用Master所在节点的资源;而Executors一般占用Worker所在节点的计算资源。一旦Driver、Executors从Master、Worker那里申请到资源之后,Driver、Executors就不再“鸟”Master和Worker了,因为资源已经到手了,后续就是任务调度的范畴。任务调度课程中也有详细的介绍,老弟可以关注下~

    大概其就是这么些关系,不知道对老弟是否有所帮助~

    2021-11-09 23:17:11

  • pythonbug

    2021-10-05 23:02:45

    老师好,有一个地方一直不懂,textFile算子是在excutor端执行的吗?那岂不是每个excutor都会先读取整个文件
    作者回复

    是在Executors端执行的,但是,每个Executors只读取文件的一部分~ 可以结合后面会讲的调度系统,来理解这一点~

    2021-10-07 21:38:57

  • GAC·DU

    2021-09-17 10:06:17

    1、collect()方法回把RDD所有的元素返还给Driver端,并在Driver端把数据序列成数组,如果数据量过大,会导致Driver端内存溢出,也会导致Driver进程奔溃。但是有个问题,既然collect方法有弊端,spark开发者为什么还要对用户提供它?难道还有别的什么特殊用户场景吗?请老师指点。
    2、目前使用的是YARN进行任务调度,提交任务需要指定--master yarn 和 --deploy-mode [cluster | client]。
    有个问题咨询老师,比如说wordCount的top5,take(5)是如何知道要从哪个executor提取想要的数据的?
    作者回复

    好问题~

    1. 满分💯,我理解,collect更多的是社区为开发者提供的一种能力或者说功能,就是Spark有能力把全量结果收集到Driver,但是坦白说,这种需求在工业级应用中并不多。但是Spark必须提供这种能力或者选项,以备开发者不时之需。换句话说,collect是鸡肋,食之无味,但是不能没有。

    2. 我理解这是两个问题。先说Top5,再说take5,两个有着天壤之别。top5,其实就是全量数据之上的计算,先分组,再聚合,最后排序,这个不是从哪个Executors提取的,而是最后做归并排序(Global sort)的时候,按照大小个提取出来的,具体是哪个Executors,要看它是不是提供了top5的words。

    再说take5,这个就有意思了。top5,暗含着的意思,就是数据一定需要排序,否则哪里有“top”的概念呢。但是take不一样,take,是“随便”拿5个。在咱们的word count示例中,假设没有sortByKey,直接用take(5),那么Spark在运行时的表现,会完全不一样。具体来说,因为是“随便”取5个,所以Spark会偷懒,不会去run全量数据,而是试探性地,去run几个小的job,这些job,只用部分数据,只要最终能取到5个值,“随便取5个”的目的就达到了,这其实是Spark对于first、take这类算子的一种优化~

    2021-09-19 19:23:04

  • coderzjh

    2021-09-17 10:09:24

    讲的真好懂,老师能不能更快点哈,从来没有像现在这样爱好学习
    作者回复

    感谢老弟认可~ 这边更新进度是每周一三五哈~

    2021-09-18 12:01:44

  • A

    2022-01-13 21:09:53

    数据交换之后,所有相同的单词都分发到了相同的 Executors 上去;
    老师这里我有个问题在任务执行的时候executor之间还会进行通信嘛?
    任务分发完成之后每一个executor不就是按部就班的执行自己的任务嘛?我不需要去拉取别的executor中的数据我只需要计算我自己的就好,聚合 的时候由driver来完成就好(不过这点也说不过去因为driver不负责计算)
    疑惑点就是executor怎么通过dag来让每个executor去不同的executor上拉取数据?每个executor是如何感知之前的stage生成的数据都存在哪里?(相同的单词)难不成处理完前一个stage后executor会像driver报道我本次处理完的数据放在哪里
    麻烦老师给解答一下,或是提供个引子我去追一下spark源码
    感谢老师!
    作者回复

    其实这里需要理解Shuffle的过程哈~ 老弟可以关注后面Shuffle的部分。
    1)dag解析,拆分stages,分发tasks,这些全部由driver来做,如你所言,Executors只负责计算,
    2)在一个stage内部,各个Executors确实仅仅是做好自己的分内事,大家专注地执行自己的任务
    3)但是,在shuffle阶段,所有Executors,都需要跨节点,去访问其他所有Executors的shuffle中间文件,这部分我们在shuffle会讲。而shuffle中间文件,是存储在spark.local.dir这个配置项所配置的本地目录的。也就是每个Executors都知道,去各个节点的这个目录下面去找。至于说,每个Executors怎么知道该获取数据的哪部分,这就还要说回shuffle中间文件,它包含data和index两部分,index会标记,每个reduce task去获取哪部分数据。
    总的来说,老弟的问题,实际上是关于shuffle的,可以特别去关注那一part哈

    2022-01-15 22:31:48

  • Abigail

    2021-11-18 08:43:50

    分享的经验:工作后一开始接触分布式计算时,直接上手的就是在 Azure/AWS/GCP 利用 DataBricks 进行数据分析处理和建模,感觉自己直接跳过了很多关于 Spark 基础的东西,关于如何部署Spark系统更是没有动手经验,平时就是 Create Cluster 然后一路点下去,或启动之前配置好的 Cluster,遇到门槛直接找 IT 或 DataBricks 的技术服务就解决了。不知道会不会影响自己未来技术积累和事业发展……心里小小慌张一下,不过目前大概是知道怎么个流程了,不是那么慌了。
    在我看来这个“零基础”入门Spark其实不是很零基础~还是有一定的门槛的。
    吴老师能否考虑稍后出一个基于Spark项目实战技术的教程,配上辅助的代码和教程,这样和这个基础入门结合起来会更受欢迎的!
    作者回复

    感谢老弟提供的建议~ 和《入门篇》配套,还有一个《调优篇》:Spark性能调优实战,那门课里面更针对性能优化一些,里面的案例也更丰富,不知道是不是老弟想要的内容~

    2021-11-23 15:55:43

  • 一期一会

    2022-01-18 15:27:08

    有个问题麻烦老师回答:在咱们的word count示例中的take(5)前面运行了sortByKey(false), 那是不是各分区在分区内部做了排序,而不是所有分区进行了归并排序呢?然后take(5)就是返回各分区中取排序前几个的,最后凑出来5个结果呢?也就是说,如果各分区自己排序然后给出结果并汇总,那take(5)的结果,并不一定是全局中出现频率最高的5个单词?
    作者回复

    好问题,sortByKey其实是先shuffle,然后再merge sort,也就是:
    1)Executors局部sort
    2)用merge sort挑选出前5个
    先shuffle,其实就保证了key一样的records,被shuffle到同一节点,这样全局再做sort的时候,就不会出现你说的局部与全局不一致的情况

    2022-01-21 23:37:20

  • Abigail

    2021-11-23 16:51:30

    吴老师能否出一个更为详细一点的spark在AWS EC2上部署的note,我在AWS上尝试了一遍, 选的也是centOS7,最好好不容易把环境设置好了,在运行最后一个例子时, 又出现了一个bug如下。本人非IT需要手把手的带一遍。

    [centos@ip-172-31-40-xxxx spark_latest]$ MASTER=spark://node0:7077 $SPARK_HOME/bin/run-example org.apache.spark.examples.SparkPi
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    Exception in thread "main" org.apache.spark.SparkException: Master must either be yarn or start with spark, mesos, k8s, or local
    作者回复

    Exception in thread "main" org.apache.spark.SparkException: Master must either be yarn or start with spark, mesos, k8s, or local。

    从报错来看,Master没有设置好,老弟看下Master的spark-defaults.conf文件,可以把文件内容贴出来,一起看看,或者你加我微信也行,“方块K”或者“rJunior”。环境的配置确实比较麻烦,哪一步稍有不对,可能就跑不起来,加微信一起看看吧。

    确认下,无密码SSH没问题是吧?不同节点jps一下,看看Master和Worker是不是成功启动了,再有,用spark-shell --master spark://node0:7077,试一下Master是不是ready的,先看看spark-shell能不能起来

    2021-11-28 10:41:08

  • 钱鹏 Allen

    2021-09-18 23:58:47

    吴老师的进程模型讲得易于理解,
    各种RDD的算子计算,我们带入到日常的生活场景里去理解。Driver和Executor,包工头和工人的比喻。
    期待老师的后续课程~
    作者回复

    感谢老弟的认可~ 嗯嗯,后面继续加油~

    2021-09-19 19:48:58

  • 我叫王子小白

    2023-03-30 08:51:23

    分布式环境部署好之后,如何使用呢?spark-shell --master spark://node0:7077 直接连接master节点跑spark任务吗?
  • DeanWinchester

    2022-07-15 13:48:32

    老师您好,您提到每个Executor只负责一个数据分片,那如果数据分片总数大于Executor个数时,每个Executor会怎么执行呀?
    我猜可能是每个Executor ‘负责’多个数据分片,然后自己切换不同的分片执行吗?
    辛苦老师解答啦
  • Ty

    2022-07-06 22:19:18

    老师您好,想请问下reduceByKey不是应该先在各个分区内部先进行一次聚合再进行shuffle操作吗?为什么本文中的图解是在转成pairRDD value为1的时候就开始和driver通信了呢?
  • Geek_b2839b

    2022-05-19 08:29:05

    老师,对于统计次数,如果先使用groupby这个算子,然后再使用聚合算子统计次数,这个过程中如果group by算子shuffle后某一个单词的个数超过了一个executor内存的数量,那么后面怎么去统计次数呢
  • 2022-03-14 10:35:48

    老师好,请教如下几个问题,麻烦您得空解答下,谢谢!
    1. Driver分发任务是以什么形式进行的,是把用户写的JAVA程序编译后,每个stage都对应一段编译后的代码,每次分发任务就是把这一段编译后的代码分发给Executor?

    2. 遇到有Shuffle的算子比如reduceBy,底层是会拆分给两个函数吗,这两个函数分别对应两个Stage中 ,前一个函数用于在同一个Executor做聚合,后一个函数进行跨Executor数据聚合?

    3. 在整个job执行中,Executor的数量会发生变化吗,比如在Word Count例子中,一开始分配了3个Executor,最后执行统计的时候,可能只有两个单词,理论上来说,只需要两个Executor就够了
  • 空de

    2022-03-10 14:19:38

    请问在实际应用中, 数据分片数量是和Executors数量一致吗,还是可以自由分配分片数量?
  • 猫太太

    2021-11-18 09:39:10

    请问在自己机器上装了一个Ubunto的虚拟机,如何完成spark的分布式部署
    作者回复

    伪分布式部署的话,可以用一台节点的多个虚拟机搭建。真正的分布式的话,还是需要多台物理节点的,或者不同物理节点上面的虚拟机

    2021-11-23 15:52:24

  • welldo

    2021-10-15 14:34:18

    老师,
    图片“Driver与Executors:Spark进程模型”里,
    一个节点里有两个executor,这是啥意思呢?
    我一直以为“节点=executor”
    作者回复

    Executors就是JVM进程哈,一个计算节点,当然是可以启动多个进程的~

    我一直以为“节点=executor”,这个是不对的哈~

    2021-10-15 18:01:07

  • 子兮

    2021-09-26 22:56:23

    老师,请问每个数据分片也就是partition的执行单元不是core嘛?为什么说是excutor呢?
    作者回复

    是的,分片对应的是cpu core~
    分片、分区、task、core,一一对应

    2021-09-28 20:07:06

  • Alvin-L

    2021-09-18 07:44:16

    多谢老师,希望以后都能有类似薯片这样的形象例子,帮助新手理解里面各种听不懂的抽象概念
    作者回复

    客气兄弟~

    嗯嗯,后面还会有更多故事的~ 基本上涉及到原理的部分,都会有故事

    2021-09-18 10:37:29