01|Spark:从“大数据的Hello World”开始

你好,我是吴磊。

从这节课开始,我们先来学习Spark的“基础知识”模块,对Spark的概念和核心原理先做一个整体的了解。我并不会从RDD、DAG这些基本概念给你讲起。坦白地说,这些抽象的概念枯燥而又乏味,对于刚开始接触Spark的你来说,很难学进去。因此,我们不妨反其道而行之,先从实战入手,用一个小例子来直观地认识Spark,看看Spark都能做些什么。

这就好比我们学习一门新的编程语言,往往都是从“Hello World”开始。我还记得,刚刚学编程那会,屏幕上打印出的“Hello World”,足足让我兴奋了一整天,让我莫名地有一种“I can change the world”的冲动。

今天这一讲,我们就从“大数据的Hello World”开始,去学习怎么在Spark之上做应用开发。不过,“大数据的Hello World”并不是把字符串打印到屏幕上这么简单,而是要先对文件中的单词做统计计数,然后再打印出频次最高的5个单词,江湖人称“Word Count”。

之所以会选择Word Count,作为我们迈入Spark门槛的第一个项目,主要有两个原因,一是Word Count场景比较简单、容易理解;二是Word Count麻雀虽小,但五脏俱全,一个小小的Word Count,就能够牵引出Spark许多的核心原理,帮助我们快速入门。

好啦,话不多说,下面我们正式开启Word Count之旅。

准备工作

巧妇难为无米之炊,要做Word Count,我们得先把源文件准备好。

咱们做Word Count的初衷是学习Spark,因此源文件的内容无足轻重。这里我提取了Wikipedia中对Spark的介绍来做我们的源文件。我把它保存到了与课程配套的GitHub项目中,并把它命名为“wikiOfSpark.txt”。你可以从这里下载它。

为了跑通Word Count实例,我们还需要在本地(Local)部署Spark运行环境。这里的“本地”,指的是你手头能够获取到的任何计算资源,比如服务器、台式机,或是笔记本电脑。

在本地部署Spark运行环境非常简单,即便你从来没有和Spark打过交道,也不必担心。只需要下面这3个步骤,我们就可以完成Spark的本地部署了

  1. 下载安装包:Spark官网下载安装包,选择最新的预编译版本即可;
  2. 解压:解压Spark安装包到任意本地目录;
  3. 配置:将“${解压目录}/bin”配置到PATH环境变量。

我这里给你准备了一个本地部署的小视频,你可以直观地感受一下。

接下来,我们确认一下Spark是否部署成功。打开命令行终端,敲入“spark-shell --version”命令,如果该命令能成功地打印出Spark版本号,就表示我们大功告成了,就像这样:

在后续的实战中,我们会用spark-shell来演示Word Count的执行过程。spark-shell是提交Spark作业众多方式中的一种,我们在后续的课程中还会展开介绍,这里你不妨暂时把它当做是Spark中的Linux shell。spark-shell提供交互式的运行环境(REPL,Read-Evaluate-Print-Loop),以“所见即所得”的方式,让开发者在提交源代码之后,就可以迅速地获取执行结果。

不过,需要注意的是,spark-shell在运行的时候,依赖于Java和Scala语言环境。因此,为了保证spark-shell的成功启动,你需要在本地预装Java与Scala。好消息是,关于Java与Scala的安装,网上的资料非常丰富,你可以参考那些资料来进行安装,咱们在本讲就不再赘述Java与Scala的安装步骤啦。

梳理Word Count的计算步骤

做了一番准备之后,接下来,我们就可以开始写代码了。不过,在“下手”之前,咱们不妨一起梳理下Word Count的计算步骤,先做到心中有数,然后再垒代码也不迟。

之前我们提到,Word Count的初衷是对文件中的单词做统计计数,打印出频次最高的5个词汇。那么Word Count的第一步就很明显了,当然是得读取文件的内容,不然咱们统计什么呢?

我们准备好的文件是wikiOfSpark.txt,它以纯文本的方式记录了关于Spark的简单介绍,我摘取了其中的部分内容给你看一下:

我们知道,文件的读取往往是以行(Line)为单位的。不难发现,wikiOfSpark.txt的每一行都包含多个单词。

我们要是以“单词”作为粒度做计数,就需要对每一行的文本做分词。分词过后,文件中的每一句话,都被打散成了一个个单词。这样一来,我们就可以按照单词做分组计数了。这就是Word Count的计算过程,主要包含如下3个步骤:

  1. 读取内容:调用Spark文件读取API,加载wikiOfSpark.txt文件内容;
  2. 分词:以行为单位,把句子打散为单词;
  3. 分组计数:按照单词做分组计数。

明确了计算步骤后,接下来我们就可以调用Spark开发API,对这些步骤进行代码实现,从而完成Word Count的应用开发。

众所周知,Spark支持种类丰富的开发语言,如Scala、Java、Python,等等。你可以结合个人偏好和开发习惯,任意选择其中的一种进行开发。尽管不同语言的开发API在语法上有着细微的差异,但不论是功能方面、还是性能方面,Spark对于每一种语言的支持都是一致的。换句话说,同样是Word Count,你用Scala实现也行,用Python实现也可以,两份代码的执行结果是一致的。不仅如此,在同样的计算资源下,两份代码的执行效率也是一样的。

因此,就Word Count这个示例来说,开发语言不是重点,我们不妨选择Scala。你可能会说:“我本来对Spark就不熟,更没有接触过Scala,一上来就用Scala演示Spark应用代码,理解起来会不会很困难?”

其实大可不必担心,Scala语法比较简洁,Word Count的Scala实现不超过10行代码。再者,对于Word Count中的每一行Scala代码,我会带着你手把手、逐行地进行讲解和分析。我相信,跟着我过完一遍代码之后,你能很快地把它“翻译”成你熟悉的语言,比如Java或Python。另外,绝大多数的Spark 源码都是由 Scala 实现的,接触并了解一些Scala的基本语法,有利于你后续阅读、学习Spark源代码。

Word Count代码实现

选定了语言,接下来,我们就按照读取内容、分词、分组计数这三步来看看Word Count具体怎么实现。

第一步,读取内容

首先,我们调用SparkContext的textFile方法,读取源文件,也就是wikiOfSpark.txt,代码如下表所示:

import org.apache.spark.rdd.RDD
 
// 这里的下划线"_"是占位符,代表数据文件的根目录
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
 
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file) 

在这段代码中,你可能会发现3个新概念,分别是spark、sparkContext和RDD。

其中,spark和sparkContext分别是两种不同的开发入口实例:

  • spark是开发入口SparkSession实例(Instance),SparkSession在spark-shell中会由系统自动创建;
  • sparkContext是开发入口SparkContext实例。

在Spark版本演进的过程中,从2.0版本开始,SparkSession取代了SparkContext,成为统一的开发入口。换句话说,要开发Spark应用,你必须先创建SparkSession。关于SparkSession和SparkContext,我会在后续的课程做更详细的介绍,这里你只要记住它们是必需的开发入口就可以了。

我们再来看看RDD,RDD的全称是Resilient Distributed Dataset,意思是“弹性分布式数据集”。RDD是Spark对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。关于RDD的定义、内涵与作用,我们留到下一讲再去展开。

在这里,你不妨先简单地把RDD理解成“数组”,比如代码中的lineRDD变量,它的类型是RDD[String],你可以暂时把它当成元素类型是String的数组,数组的每个元素都是文件中的一行字符串。

获取到文件内容之后,下一步我们就要做分词了。

第二步,分词

“分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用RDD的flatMap方法来完成。flatMap操作在逻辑上可以分成两个步骤:映射展平

这两个步骤是什么意思呢?我们还是结合Word Count的例子来看:

// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" ")) 

要把lineRDD的行元素转换为单词,我们得先用分隔符对每个行元素进行分割(Split),咱们这里的分隔符是空格。

分割之后,每个行元素就都变成了单词数组,元素类型也从String变成了Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。

映射过后,RDD类型由原来的RDD[String]变为RDD[Array[String]]。如果把RDD[String]看成是“数组”的话,那么RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。

为了后续对单词做分组,我们还需要对这个“二维数组”做展平,也就是去掉内层的嵌套结构,把“二维数组”还原成“一维数组”,如下图所示。

就这样,在flatMap算子的作用下,原来以行为元素的lineRDD,转换成了以单词为元素的wordRDD。

不过,值得注意的是,我们用“空格”去分割句子,有可能会产生空字符串。所以,在完成“映射”和“展平”之后,对于这样的“单词”,我们要把其中的空字符串都过滤掉,这里我们调用RDD的filter方法来过滤:

// 过滤掉空字符串
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

这样一来,我们在分词阶段就得到了过滤掉空字符串之后的单词“数组”,类型是RDD[String]。接下来,我们就可以准备做分组计数了。

第三步,分组计数

在RDD的开发框架下,聚合类操作,如计数、求和、求均值,需要依赖键值对(Key Value Pair)类型的数据元素,也就是(Key,Value)形式的“数组”元素。

因此,在调用聚合算子做分组计数之前,我们要先把RDD元素转换为(Key,Value)的形式,也就是把RDD[String]映射成RDD[(String, Int)]。

其中,我们统一把所有的Value置为1。这样一来,对于同一个的单词,在后续的计数运算中,我们只要对Value做累加即可,就像这样:

下面是对应的代码:

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1)) 

这样一来,RDD就由原来存储String元素的cleanWordRDD,转换为了存储(String,Int)的kvRDD。

完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。下面,我们使用聚合算子reduceByKey来同时完成分组和计数这两个操作。

对于kvRDD这个键值对“数组”,reduceByKey先是按照Key(也就是单词)来做分组,分组之后,每个单词都有一个与之对应的Value列表。然后根据用户提供的聚合函数,对同一个Key的所有Value做reduce运算。

这里的reduce,你可以理解成是一种计算步骤或是一种计算方法。当我们给定聚合函数后,它会用折叠的方式,把包含多个元素的列表转换为单个元素值,从而统计出不同元素的数量。

在Word Count的示例中,我们调用reduceByKey实现分组计算的代码如下:

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y) 

可以看到,我们传递给reduceByKey算子的聚合函数是(x, y) => x + y,也就是累加函数。因此,在每个单词分组之后,reduce会使用累加函数,依次折叠计算Value列表中的所有元素,最终把元素列表转换为单词的频次。对于任意一个单词来说,reduce的计算过程都是一样的,如下图所示。

reduceByKey完成计算之后,我们得到的依然是类型为RDD[(String, Int)]的RDD。不过,与kvRDD不同,wordCounts元素的Value值,记录的是每个单词的统计词频。到此为止,我们就完成了Word Count主逻辑的开发与实现。

在程序的最后,我们还要把wordCounts按照词频做排序,并把词频最高的5个单词打印到屏幕上,代码如下所示。

// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

代码执行

应用开发完成之后,我们就可以把代码丢进已经准备好的本地Spark部署环境里啦。首先,我们打开命令行终端(Terminal),敲入“spark-shell”,打开交互式运行环境,如下图所示。

然后,把我们开发好的代码,依次敲入spark-shell。为了方便你操作,我把完整的代码实现整理到下面了:

import org.apache.spark.rdd.RDD
 
// 这里的下划线"_"是占位符,代表数据文件的根目录
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
 
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
 
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
 
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
 
// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

我们把上面的代码依次敲入到spark-shell之后,spark-shell最终会把词频最高的5个单词打印到屏幕上:

在Wikipedia的Spark介绍文本中,词频最高的单词分别是the、Spark、a、and和of,除了“Spark”之外,其他4个单词都是常用的停用词(Stop Word),因此它们几个高居榜首也就不足为怪了。

好啦,到此为止,我们在Spark之上,完成了“大数据领域Hello World”的开发与实现,恭喜你跨入大数据开发的大门!

重点回顾

今天这一讲,我们围绕着Word Count,初步探索并体验了Spark应用开发。你首先需要掌握的是Spark的本地部署,从而可以通过spark-shell来迅速熟悉Spark,获得对Spark的“第一印象”。要在本地部署Spark,你需要遵循3个步骤:

  • Spark官网下载安装包,选择最新的预编译版本即可;
  • 解压Spark安装包到任意本地目录;
  • 将“${解压目录}/bin”配置到PATH环境变量。

然后,我们一起分析并实现了入门Spark的第一个应用程序:Word Count。在我们的例子中,Word Count要完成的计算任务,是先对文件中的单词做统计计数,然后再打印出频次最高的5个单词。它的实现过程分为3个步骤:

  • 读取内容:调用Spark文件读取API,加载wikiOfSpark.txt文件内容;
  • 分词:以行为单位,把句子打散为单词;
  • 分组计数:按照单词做分组计数。

也许你对RDD API还不熟悉,甚至从未接触过Scala,不过没关系,完成了这次“大数据的Hello World”开发之旅,你就已经踏上了新的征程。在接下来的课程里,让我们携手并肩,像探索新大陆一样,一层一层地剥开Spark的神秘面纱,加油!

每课一练

在Word Count的代码实现中,我们用到了多种多样的RDD算子,如map、filter、flatMap和reduceByKey,除了这些算子以外,你知道还有哪些常用的RDD算子吗?(提示,可以结合官网去查找)。

另外,你能说说,以上这些算子都有哪些共性或是共同点吗?

欢迎你把答案分享到评论区,我在评论区等你。

如果这一讲对你有帮助,也欢迎你分享给自己的朋友,我们下一讲再见!

精选留言

  • Alvin-L

    2021-09-20 18:31:03

    在Python中运行:
    ```
    from pyspark import SparkContext

    textFile = SparkContext().textFile("./wikiOfSpark.txt")
    wordCount = (
    textFile.flatMap(lambda line: line.split(" "))
    .filter(lambda word: word != "")
    .map(lambda word: (word, 1))
    .reduceByKey(lambda x, y: x + y)
    .sortBy(lambda x: x[1], False)
    .take(5)
    )
    print(wordCount)
    #显示: [('the', 67), ('Spark', 63), ('a', 54), ('and', 51), ('of', 50)]
    ```
    作者回复

    赞👍,Perfect!

    可以作为Python标杆代码,供后续同学参考~

    2021-09-23 00:10:30

  • liugddx

    2021-09-07 13:33:13

    我是一个大数据小白,我想咨询下spark和hadoop在大数据体系下的关系?
    作者回复

    好问题,Hadoop的范畴可大可小。

    往小了说,Hadoop特指HDFS、YARN、MapReduce这三个组件,他们分别是Hadoop分布式文件系统、分布式任务调度框架、分布式计算引擎。

    往大了说,Hadoop生态包含所有由这3个组件衍生出的大数据产品,如Hive、Hbase、Pig、Sqoop,等等。

    Spark和Hadoop的关系,是共生共赢的关系。Spark的定位是分布式计算引擎,因此,它的直接“竞争对手”,是MapReduce,也就是Hadoop的分布式计算引擎。Spark是内存计算引擎,而MapReduce在计算的过程中,需要频繁落盘,因此,一般来说,相比MapReduce,Spark在执行性能上,更胜一筹。

    对于HDFS、YARN,Spark可与之完美结合,实际上,在很多的使用场景中,Spark的数据源往往存储于HDFS,而YARN是Spark重要的资源调度框架之一。

    大体上是这些,当然,还可以说的更细,老弟可以继续在后台留言,我们继续讨论~

    2021-09-07 22:14:47

  • Neo-dqy

    2021-09-18 13:53:53

    老师好!wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)这行代码我还存在疑问,为什么这里的map函数使用了花括号{ }而不是上面一些算子的( ),同时这个case又是什么意思?这一行代码非常像我曾经在Python中使用字典数据结构,然后根据字典值的升序排序。最后,貌似Scala语言本身就可以实现wordcount案例,那么它本身的实现和spark实现相比,spark有什么优势呢?
    作者回复

    Scala语法确实比较灵活,一般来说,简单表达式用(),复杂表达式用{}。

    比如,简单函数体:(x => x + 1)、(_ + 1),等等;
    复杂函数体:{case x: String => “一大堆关于x的转换” }

    关于最后的问题,也就是Scala也可以实现Word Count,和Spark有什么区别。这个问题比较重要,老弟需要用心听一下~

    其实,任何一种语言,都可以实现任何计算逻辑,毕竟是高级编程语言,基本上都是图灵完备的。所以说像word count这种非常简单的逻辑,不只是scala,其他语言都能搞定。但是,Scala也好、Java也罢,再或者是Python,他们实现的word count,只能在单机跑,而Spark实现的Word Count,是在分布式环境跑。这,是本质的区别。

    Spark,最核心的能力,就是分布式计算,利用大规模集群的能力。这是最本质的区别。

    比如,现在有2万亿行的文本,需要你计算word count,你用scala也能在单机实现,但是,这个word count只能在单机跑,对于一般的机器,大概率是跑不动的。

    可是,在分布式集群中,这样的量级,就轻松得多了。Spark,更多的,是提供一种分布式计算的能力,它提供给开发者简单的开发API,让开发者可以像开发单机应用那样,轻而易举地开发分布式应用,让分布式计算对于开发者来说,变得透明。开发者只需要关注业务逻辑,或者说计算逻辑,而不必关心分布式系统底层的调度、分发、数据交换、等等和分布式计算本身有关的东西。

    所以说,Scala和Spark,两者没有可比性。当然,除了Spark,现在还有非常多的分布式计算框架,比如MapReduce、Flink、Presto、TensorFlow,等等,大家都是玩分布式计算的,只不过“术业有专攻”,各司其职~

    2021-09-19 19:44:25

  • 杨帅

    2022-06-13 23:52:08

    老师,spark-3.2.1 默认从hdfs读取文件,所以rootPath需要加上协议 file://.
  • Vic

    2021-09-07 12:39:41

    遇到这个问题
    scala> val rootPath: String = _
    <console>:24: error: unbound placeholder parameter
    val rootPath: String = _
    网上搜一下,说这是汇编错误。
    要把val 改成var , 但会遇到"_"这default值是null。
    org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/vic/src/data/null/wikiOfSpark.txt
    这一段就先跳过root_path,直接给file一个路径,是可以成功运行"word count",得到和老师一样的结果:
    [Stage 0:> (0 + 2) / res0: Array[(Int, String)] = Array((67,the), (63,Spark), (54,a), (51,and), (50,of))
    作者回复

    这个是我的锅,哈哈~

    这里是我偷懒了,我应该在这条code上面加个注释,这里“_”(下划线)的意思,是你的文件根目录。Scala里面,用“_”表示一些不重要、不关心的东西,所以我偷用了Scala的“_”。但这里确实会引起困惑,不用管这个,我的锅,用你的文件根目录替换掉这里的“_”就好~

    2021-09-07 22:18:21

  • Neo-dqy

    2021-09-18 16:38:41

    老师我可以再问一下,如果我是用IDEA创建Spark项目,是不是只要配置好Scala的SDK,然后在pom文件中加入对应版本号的spark依赖,就会自动下载spark包了?这个时候不需要再去官网下载spark了吗,同时也不再需要使用spark-shell了吗?
    作者回复

    是的,没错~

    2021-09-19 19:47:27

  • Abigail

    2021-09-07 13:22:12

    前排占座!三年前接触过 Spark 今天从头再学!
    作者回复

    欢迎~

    2021-09-07 22:15:20

  • 陈金鑫

    2022-04-22 12:36:00

    又学了一遍,wordcount一句话实现:
    spark.sparkContext.textFile(file).flatMap(_.split(" ")).filter(!_.equals("")).map((_, 1)).reduceByKey(_ + _).map(t => (t._2, t._1)).sortByKey(false).take(5)
  • 浮生若梦

    2021-12-24 08:59:44

    Java实现:

    SparkConf sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]");
    JavaSparkContext JSC = new JavaSparkContext(sparkConf);

    // 读取文件内容
    JavaRDD<String> lineRDD = JSC.textFile("wikiOfSpark.txt");
    // 以行为单位做分词
    JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) throws Exception {
    return Arrays.asList(s.split(" ")).iterator();
    }
    });
    JavaRDD<String> cleanWordRDD = wordRDD.filter(new Function<String, Boolean>() {
    @Override
    public Boolean call(String s) throws Exception {
    return !s.equals("");
    }
    });

    // 把RDD元素转换为(Key,Value)的形式
    JavaPairRDD<String, Integer> kvRDD = cleanWordRDD.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<String, Integer>(s,1);
    }
    });
    // 按照单词做分组计数
    JavaPairRDD<String, Integer> wordCounts = kvRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
    return integer+integer2;
    }
    });
    // 打印词频最高的5个词汇(先将元组的key value交换一下顺序,然后在调用sortByKey())
    wordCounts.mapToPair((row)-> new Tuple2<>(row._2,row._1)).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, String>>() {
    @Override
    public void call(Tuple2<Integer, String> stringIntegerTuple2) throws Exception {
    System.out.println(stringIntegerTuple2._1 + ":" + stringIntegerTuple2._2);
    }
    });

    //关闭context
    JSC.close();
    作者回复

    满分💯,赞👍~

    2021-12-24 22:21:14

  • 火炎焱燚

    2021-09-17 21:47:44

    Python版代码为:

    file='~~~/wikiOfSpark.txt'
    lineRDD=sc.textFile(file)
    lineRDD.first() # 会打印出lineRDD的第一行: u'Apache Spark',如果出错则不打印
    wordRDD=lineRDD.flatMap(lambda line: line.split(" "))
    cleanWordRDD=wordRDD.filter(lambda word: word!='')
    kvRDD=cleanWordRDD.map(lambda word:(word,1))
    wordCounts=kvRDD.reduceByKey(lambda x,y:x+y)
    wordCounts.map(lambda (k,v):(v,k)).sortByKey(False).take(5)
    作者回复

    Cool~

    后面等专栏更完了,打算把所有Scala代码整理出一份Python版本的,老弟有没有兴趣一起呀~

    2021-09-19 19:34:59

  • Unknown element

    2021-09-17 21:19:56

    问下执行 val lineRDD: RDD[String] = spark.sparkContext.textFile(file) 报错error: not found: value spark是怎么回事?
    作者回复

    spark-shell中,spark是默认的SparkSession实例,你是在spark-shell中执行这段代码吗?如果是在IDE,需要自己明确定义SparkSession实例的,比如:

    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder
    .master("local[2]")
    .appName("SparkSession Example")
    .getOrCreate()

    2021-09-19 19:33:58

  • 国度

    2022-02-05 14:59:12

    2022年2月5号学习打卡记录
    机器环境:ROG14
    操作系统:win11 + wsl Ubuntu20.04
    环境变量:
    ----------------------------

    export SPARK_HOME=/mnt/c/spark/spark-2.4.8-bin-hadoop2.7
    export JAVA_HOME=/mnt/c/linux_environment/jdk/jdk1.8.0_321
    export M2_HOME=/mnt/c/linux_environment/apache-maven-3.8.4
    export SCALA_HOME=/mnt/c/linux_environment/scala3-3.1.1

    export PATH=$SPARK_HOME/bin:$SCALA_HOME/bin:$M2_HOME/bin:$JAVA_HOME/bin:$PATH

    ---------------------------
    希望帮助和我一样从零开始一起学习的同学躲避一些坑:

    坑1:jdk版本不兼容:
    一开始使用jdk17版本,在启动过程中一直报错,降为1.8后启动成功;

    坑2:hadoop版本问题:
    hadoop3.2.1 逐步使用Dataset,报错类型转换异常;
    由于scala经验不足,暂时无法大规模改写老师的代码,降低版本为spark2.4.8
    下载地址:https://dlcdn.apache.org/spark/ 可以选择适合的版本下载

    原理性的还没有搞懂,目前在第一阶段,读懂,简单改写为主;

    感谢吴磊老师的课
    作者回复

    赞学习打卡~ 一起加油~

    2022-02-12 21:59:56

  • 猫太太

    2021-11-18 09:16:49

    请问在本地部署spark环境不需要先安装hadoop么
    作者回复

    不需要的~

    2021-11-23 15:51:08

  • ZJ

    2023-11-25 03:23:37

    Java Version:

    public static void main(String[] args) {
    System.setProperty("hadoop.home.dir", "C:\\hadoop");
    SparkConf conf = new SparkConf().setAppName("spark-in-Java").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> lineRdd = sc.textFile("src/main/resources/sparkDev.txt");
    lineRdd.flatMap(k -> Arrays.asList(k.split(" ")).iterator())
    .mapToPair(k -> new Tuple2<>(k, 1))
    .reduceByKey((k1, k2) -> k1 + k2)
    .mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1))
    .sortByKey(false).take(10)
    .forEach(k -> System.out.println(k._2 + " has " + k._1));

    sc.close();
    }
  • Geek_f09cec

    2022-10-27 15:49:17

    将“${解压目录}/bin”配置到 PATH 环境变量。这一步不懂,在哪儿配置,怎么配置呢?
  • 🌈你是人间四月天💫

    2022-07-04 14:18:15

    像我这种零基础的,看到很懵,第一个demo代码是spark,这个变量怎么来的?这个真的适合零基础?
  • Gavin_From_Mars

    2022-04-20 01:38:29

    按照文中安装后一直有问题。。。启动spark看到默认带scala2.12.15.但是启动spark shell有错误。“error: not found: value spark”,然后在shell中执行文中的wordcount也是同样的错误
  • 我爱夜来香

    2022-04-11 14:51:59

    老师,DAG计算模型的优势可不可以这么理解:spark在没有遇到宽依赖之前,所有的操作包括wordRDD到lineRDD到wordRDD到kvRDD等都是一次性在内存中完成的,最终输出kvRDD到磁盘中进行shuffle,只会有一次落盘操作;
    而mapReduce中比如把整个文本切割成一行一行,在内存中完成后需要落盘,再由一行一行转换成键值对又要落盘.中间涉及了太多写盘操作。换句话说,spark基于内存计算就是充分利用内存?麻烦老师解答一下
  • 巴普洛夫的

    2021-09-12 18:01:01

    wordCounts.map{case (k, v) => (v, k)}.sortByKey(false)
    这一步是做了什么呢,没有见过的语法
    作者回复

    一步步说哈~

    map{case (k, v) => (v, k)},这一步是把(Key,Value)对调,目的是按照“计数”来排序。比如,原来是(Spark,63),这一步之后,这条记录就变成了(63,Spark),不妨把这一步拆解开来,用first探索一下~

    sortByKey(false) 是降序排序,注意,这时候的Key,不再是单词了,而是调换顺序之后的Value(比如63),也就是单词计数。

    2021-09-12 23:35:25

  • Z

    2021-09-08 23:10:22

    为啥我的结果是单个字母呢?
    作者回复

    看看分隔符是不是用了空字符串?分隔符应该是空格“ ”

    2021-09-10 09:19:12