03 | RDD常用算子(一):RDD内部的数据转换

你好,我是吴磊。

在上一讲的最后,我们用一张表格整理了Spark官网给出的RDD算子。想要在Spark之上快速实现业务逻辑,理解并掌握这些算子无疑是至关重要的。

因此,在接下来的几讲,我将带你一起梳理这些常见算子的用法与用途。不同的算子,就像是厨房里的炒勺、铲子、刀具和各式各样的锅碗瓢盆,只有熟悉了这些“厨具”的操作方法,才能在客人点餐的时候迅速地做出一桌好菜。

今天这一讲,我们先来学习同一个RDD内部的数据转换。掌握RDD常用算子是做好Spark应用开发的基础,而数据转换类算子则是基础中的基础,因此我们优先来学习这类RDD算子。

在这些算子中,我们重点讲解的就是map、mapPartitions、flatMap、filter。这4个算子几乎囊括了日常开发中99%的数据转换场景,剩下的mapPartitionsWithIndex,我把它留给你作为课后作业去探索。

图片

俗话说,巧妇难为无米之炊,要想玩转厨房里的厨具,我们得先准备好米、面、油这些食材。学习RDD算子也是一样,要想动手操作这些算子,咱们得先有RDD才行。

所以,接下来我们就一起来看看RDD是怎么创建的。

创建RDD

在Spark中,创建RDD的典型方式有两种:

  • 通过SparkContext.parallelize在内部数据之上创建RDD;
  • 通过SparkContext.textFile等API从外部数据创建RDD。

这里的内部、外部是相对应用程序来说的。开发者在Spark应用中自定义的各类数据结构,如数组、列表、映射等,都属于“内部数据”;而“外部数据”指代的,是Spark系统之外的所有数据形式,如本地文件系统或是分布式文件系统中的数据,再比如来自其他大数据组件(Hive、Hbase、RDBMS等)的数据。

第一种创建方式的用法非常简单,只需要用parallelize函数来封装内部数据即可,比如下面的例子:

import org.apache.spark.rdd.RDD
val words: Array[String] = Array("Spark", "is", "cool")
val rdd: RDD[String] = sc.parallelize(words)

你可以在spark-shell中敲入上述代码,来直观地感受parallelize创建RDD的过程。通常来说,在Spark应用内定义体量超大的数据集,其实都是不太合适的,因为数据集完全由Driver端创建,且创建完成后,还要在全网范围内跨节点、跨进程地分发到其他Executors,所以往往会带来性能问题。因此,parallelize API的典型用法,是在“小数据”之上创建RDD。

要想在真正的“大数据”之上创建RDD,我们还得依赖第二种创建方式,也就是通过SparkContext.textFile等API从外部数据创建RDD。由于textFile API比较简单,而且它在日常的开发中出现频率比较高,因此我们使用textFile API来创建RDD。在后续对各类RDD算子讲解的过程中,我们都会使用textFile API从文件系统创建RDD。

为了保持讲解的连贯性,我们还是使用第一讲中的源文件wikiOfSpark.txt来创建RDD,代码实现如下所示:

import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

好啦,创建好了RDD,我们就有了可以下锅的食材。接下来,咱们就要正式地走进厨房,把铲子和炒勺挥起来啦。

RDD内的数据转换

首先,我们先来认识一下map算子。毫不夸张地说,在所有的RDD算子中,map“出场”的概率是最高的。因此,我们必须要掌握map的用法与注意事项。

map:以元素为粒度的数据转换

我们先来说说map算子的用法:给定映射函数f,map(f)以元素为粒度对RDD做数据转换。其中f可以是带有明确签名的带名函数,也可以是匿名函数,它的形参类型必须与RDD的元素类型保持一致,而输出类型则任由开发者自行决定。

这种照本宣科的介绍听上去难免会让你有点懵,别着急,接下来我们用些小例子来更加直观地展示map的用法。

第一讲的Word Count示例中,我们使用如下代码,把包含单词的RDD转换成元素为(Key,Value)对的RDD,后者统称为Paired RDD。

// 把普通RDD转换为Paired RDD
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

在上面的代码实现中,传递给map算子的形参,即:word => (word,1),就是我们上面说的映射函数f。只不过,这里f是以匿名函数的方式进行定义的,其中左侧的word表示匿名函数f的输入形参,而右侧的(word,1)则代表函数f的输出结果。

如果我们把匿名函数变成带名函数的话,可能你会看的更清楚一些。这里我用一段代码重新定义了带名函数f。

// 把RDD元素转换为(Key,Value)的形式
 
// 定义映射函数f
def f(word: String): (String, Int) = {
return (word, 1)
}
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

可以看到,我们使用Scala的def语法,明确定义了带名映射函数f,它的计算逻辑与刚刚的匿名函数是一致的。在做RDD数据转换的时候,我们只需把函数f传递给map算子即可。不管f是匿名函数,还是带名函数,map算子的转换逻辑都是一样的,你不妨把以上两种实现方式分别敲入到spark-shell,去验证执行结果的一致性。

到这里为止,我们就掌握了map算子的基本用法。现在你就可以定义任意复杂的映射函数f,然后在RDD之上通过调用map(f)去翻着花样地做各种各样的数据转换。

比如,通过定义如下的映射函数f,我们就可以改写Word Count的计数逻辑,也就是把“Spark”这个单词的统计计数权重提高一倍:

// 把RDD元素转换为(Key,Value)的形式
 
// 定义映射函数f
def f(word: String): (String, Int) = {
if (word.equals("Spark")) { return (word, 2) }
return (word, 1)
}
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

尽管map算子足够灵活,允许开发者自由定义转换逻辑。不过,就像我们刚刚说的,map(f)是以元素为粒度对RDD做数据转换的,在某些计算场景下,这个特点会严重影响执行效率。为什么这么说呢?我们来看一个具体的例子。

比方说,我们把Word Count的计数需求,从原来的对单词计数,改为对单词的哈希值计数,在这种情况下,我们的代码实现需要做哪些改动呢?我来示范一下:

// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  // 获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  // 使用MD5计算哈希值
  val hash = md5.digest(word.getBytes).mkString
  // 返回哈希值与数字1的Pair
  (hash, 1)
}

由于map(f)是以元素为单元做转换的,那么对于RDD中的每一条数据记录,我们都需要实例化一个MessageDigest对象来计算这个元素的哈希值。

在工业级生产系统中,一个RDD动辄包含上百万甚至是上亿级别的数据记录,如果处理每条记录都需要事先创建MessageDigest,那么实例化对象的开销就会聚沙成塔,不知不觉地成为影响执行效率的罪魁祸首。

那么问题来了,有没有什么办法,能够让Spark在更粗的数据粒度上去处理数据呢?还真有,mapPartitions和mapPartitionsWithIndex这对“孪生兄弟”就是用来解决类似的问题。相比mapPartitions,mapPartitionsWithIndex仅仅多出了一个数据分区索引,因此接下来我们把重点放在mapPartitions上面。

mapPartitions:以数据分区为粒度的数据转换

按照介绍算子的惯例,我们还是先来说说mapPartitions的用法。mapPartitions,顾名思义,就是以数据分区为粒度,使用映射函数f对RDD进行数据转换。对于上述单词哈希值计数的例子,我们结合后面的代码,来看看如何使用mapPartitions来改善执行性能:

// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
  // 注意!这里是以数据分区为粒度,获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  val newPartition = partition.map( word => {
  // 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
    (md5.digest(word.getBytes()).mkString,1)
  })
  newPartition
})

可以看到,在上面的改进代码中,mapPartitions以数据分区(匿名函数的形参partition)为粒度,对RDD进行数据转换。具体的数据处理逻辑,则由代表数据分区的形参partition进一步调用map(f)来完成。你可能会说:“partition. map(f)仍然是以元素为粒度做映射呀!这和前一个版本的实现,有什么本质上的区别呢?”

仔细观察,你就会发现,相比前一个版本,我们把实例化MD5对象的语句挪到了map算子之外。如此一来,以数据分区为单位,实例化对象的操作只需要执行一次,而同一个数据分区中所有的数据记录,都可以共享该MD5对象,从而完成单词到哈希值的转换。

通过下图的直观对比,你会发现,以数据分区为单位,mapPartitions只需实例化一次MD5对象,而map算子却需要实例化多次,具体的次数则由分区内数据记录的数量来决定。

图片

对于一个有着上百万条记录的RDD来说,其数据分区的划分往往是在百这个量级,因此,相比map算子,mapPartitions可以显著降低对象实例化的计算开销,这对于Spark作业端到端的执行性能来说,无疑是非常友好的。

实际上。除了计算哈希值以外,对于数据记录来说,凡是可以共享的操作,都可以用mapPartitions算子进行优化。这样的共享操作还有很多,比如创建用于连接远端数据库的Connections对象,或是用于连接Amazon S3的文件系统句柄,再比如用于在线推理的机器学习模型,等等,不一而足。你不妨结合实际工作场景,把你遇到的共享操作整理到留言区,期待你的分享。

相比mapPartitions,mapPartitionsWithIndex仅仅多出了一个数据分区索引,这个数据分区索引可以为我们获取分区编号,当你的业务逻辑中需要使用到分区编号的时候,不妨考虑使用这个算子来实现代码。除了这个额外的分区索引以外,mapPartitionsWithIndex在其他方面与mapPartitions是完全一样的。

介绍完map与mapPartitions算子之后,接下来,我们趁热打铁,再来看一个与这两者功能类似的算子:flatMap。

flatMap:从元素到集合、再从集合到元素

flatMap其实和map与mapPartitions算子类似,在功能上,与map和mapPartitions一样,flatMap也是用来做数据映射的,在实现上,对于给定映射函数f,flatMap(f)以元素为粒度,对RDD进行数据转换。

不过,与前两者相比,flatMap的映射函数f有着显著的不同。对于map和mapPartitions来说,其映射函数f的类型,都是(元素) => (元素),即元素到元素。而flatMap映射函数f的类型,是(元素) => (集合),即元素到集合(如数组、列表等)。因此,flatMap的映射过程在逻辑上分为两步:

  • 以元素为单位,创建集合;
  • 去掉集合“外包装”,提取集合元素。

这么说比较抽象,我们还是来举例说明。假设,我们再次改变Word Count的计算逻辑,由原来统计单词的计数,改为统计相邻单词共现的次数,如下图所示:

图片

对于这样的计算逻辑,我们该如何使用flatMap进行实现呢?这里我们先给出代码实现,然后再分阶段地分析flatMap的映射过程:

// 读取文件内容
val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码
// 以行为单位提取相邻单词
val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
  // 将行转换为单词数组
  val words: Array[String] = line.split(" ")
  // 将单个单词数组,转换为相邻单词数组
  for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
})

在上面的代码中,我们采用匿名函数的形式,来提供映射函数f。这里f的形参是String类型的line,也就是源文件中的一行文本,而f的返回类型是Array[String],也就是String类型的数组。在映射函数f的函数体中,我们先用split语句把line转化为单词数组,然后再用for循环结合yield语句,依次把单个的单词,转化为相邻单词词对。

注意,for循环返回的依然是数组,也即类型为Array[String]的词对数组。由此可见,函数f的类型是(String) => (Array[String]),也就是刚刚说的第一步,从元素到集合。但如果我们去观察转换前后的两个RDD,也就是lineRDD和wordPairRDD,会发现它们的类型都是RDD[String],换句话说,它们的元素类型都是String。

回顾map与mapPartitions这两个算子,我们会发现,转换前后RDD的元素类型,与映射函数f的类型是一致的。但在flatMap这里,却出现了RDD元素类型与函数类型不一致的情况。这是怎么回事呢?其实呢,这正是flatMap的“奥妙”所在,为了让你直观地理解flatMap的映射过程,我画了一张示意图,如下所示:

图片

不难发现,映射函数f的计算过程,对应着图中的步骤1与步骤2,每行文本都被转化为包含相邻词对的数组。紧接着,flatMap去掉每个数组的“外包装”,提取出数组中类型为String的词对元素,然后以词对为单位,构建新的数据分区,如图中步骤3所示。这就是flatMap映射过程的第二步:去掉集合“外包装”,提取集合元素

得到包含词对元素的wordPairRDD之后,我们就可以沿用Word Count的后续逻辑,去计算相邻词汇的共现次数。你不妨结合文稿中的代码与第一讲中Word Count的代码,去实现完整版的“相邻词汇计数统计”。

filter:过滤RDD

在今天的最后,我们再来学习一下,与map一样常用的算子:filter。filter,顾名思义,这个算子的作用,是对RDD进行过滤。就像是map算子依赖其映射函数一样,filter算子也需要借助一个判定函数f,才能实现对RDD的过滤转换。

所谓判定函数,它指的是类型为(RDD元素类型) => (Boolean)的函数。可以看到,判定函数f的形参类型,必须与RDD的元素类型保持一致,而f的返回结果,只能是True或者False。在任何一个RDD之上调用filter(f),其作用是保留RDD中满足f(也就是f返回True)的数据元素,而过滤掉不满足f(也就是f返回False)的数据元素。

老规矩,我们还是结合示例来讲解filter算子与判定函数f。

在上面flatMap例子的最后,我们得到了元素为相邻词汇对的wordPairRDD,它包含的是像“Spark-is”、“is-cool”这样的字符串。为了仅保留有意义的词对元素,我们希望结合标点符号列表,对wordPairRDD进行过滤。例如,我们希望过滤掉像“Spark-&”、“|-data”这样的词对。

掌握了filter算子的用法之后,要实现这样的过滤逻辑,我相信你很快就能写出如下的代码实现:

// 定义特殊字符列表
val list: List[String] = List("&", "|", "#", "^", "@")
 
// 定义判定函数f
def f(s: String): Boolean = {
val words: Array[String] = s.split("-")
val b1: Boolean = list.contains(words(0))
val b2: Boolean = list.contains(words(1))
return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
}
 
// 使用filter(f)对RDD进行过滤
val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)

掌握了filter算子的用法之后,你就可以定义任意复杂的判定函数f,然后在RDD之上通过调用filter(f)去变着花样地做数据过滤,从而满足不同的业务需求。

重点回顾

好啦,到此为止,关于RDD内数据转换的几个算子,我们就讲完了,我们一起来做个总结。今天这一讲,你需要掌握map、mapPartitions、flatMap和filter这4个算子的作用和具体用法。

首先,我们讲了map算子的用法,它允许开发者自由地对RDD做各式各样的数据转换,给定映射函数f,map(f)以元素为粒度对RDD做数据转换。其中f可以是带名函数,也可以是匿名函数,它的形参类型必须与RDD的元素类型保持一致,而输出类型则任由开发者自行决定。

为了提升数据转换的效率,Spark提供了以数据分区为粒度的mapPartitions算子。mapPartitions的形参是代表数据分区的partition,它通过在partition之上再次调用map(f)来完成数据的转换。相比map,mapPartitions的优势是以数据分区为粒度初始化共享对象,这些共享对象在我们日常的开发中很常见,比如数据库连接对象、S3文件句柄、机器学习模型等等。

紧接着,我们介绍了flatMap算子。flatMap的映射函数f比较特殊,它的函数类型是(元素) => (集合),这里集合指的是像数组、列表这样的数据结构。因此,flatMap的映射过程在逻辑上分为两步,这一点需要你特别注意:

  • 以元素为单位,创建集合;
  • 去掉集合“外包装”,提取集合元素。

最后,我们学习了filter算子,filter算子的用法与map很像,它需要借助判定函数f来完成对RDD的数据过滤。判定函数的类型必须是(RDD元素类型) => (Boolean),也就是形参类型必须与RDD的元素类型保持一致,返回结果类型则必须是布尔值。RDD中的元素是否能够得以保留,取决于判定函数f的返回值是True还是False。

虽然今天我们只学了4个算子,但这4个算子在日常开发中的出现频率非常之高。掌握了这几个简单的RDD算子,你几乎可以应对RDD中90%的数据转换场景。希望你对这几个算子多多加以练习,从而在日常的开发工作中学以致用。

每课一练

讲完了正课,我来给你留3个思考题:

1.请你结合官网的介绍,自学mapPartitionsWithIndex算子。请你说一说,在哪些场景下可能会用到这个算子?

2.对于我们今天学过的4个算子,再加上没有详细解释的mapPartitionsWithIndex,你能说说,它们之间有哪些共性或是共同点吗?

3.你能说一说,在日常的工作中,还遇到过哪些可以在mapPartitions中初始化的共享对象呢?

欢迎你在评论区回答这些练习题。你也可以把这一讲分享给更多的朋友或者同事,和他们一起讨论讨论,交流是学习的催化剂。我在评论区等你。

精选留言

  • qinsi

    2021-09-16 12:55:19

    不是很懂spark。mapPartitions()那里,光从代码上来看的话,在map()的闭包里可以访问到外面mapPartitions()闭包里的同一个md5实例,从而达到共享实例的效果。那么有没有可能在最外层创建一个全局的md5实例,这样就算只用map(),在闭包里访问的也是这同一个实例?这样会有什么问题吗?或者说在这种情况下mapPartitions()相比map()还有什么优势?
    作者回复

    非常好的问题~ 先赞一个 👍

    先说结论,你说的没错,通过在Driver端创建全局变量,然后在map中进行引用,也可以达到mapPartitions同样的效果。原理就是你说的函数闭包,其实本质上,它就是个对象,包含了外部数据结构的函数对象。

    但是,需要注意的是,通过这种方式创建的闭包,一定要保证Driver端创建的全局变量是可以序列化的,也就是实现了Serializable接口。只有满足这个前提,Spark才能在这个闭包之上创建分布式任务,否则会报“Task not serializable”错误。

    这个错误其实很好理解,要把Driver创建的对象带到Executors,这个对象一定是要可序列化的才行。不过遗憾的是,很多共享对象,比如咱们本讲的MD5,都没有实现Serializable接口,所以Driver端创建的MD5实例,Spark“带不动”(没法从Driver带到Executors),这一点你不妨动手试一试~

    不过这个点提得非常好,对于那些自定义的Class,我们可以让它实现Serializable接口,从而可以轻松地在Driver创建Spark可以“带的动”的全局变量。

    说到这里,我追问老弟一句:在Driver创建MD5实例,map中直接引用,Spark会报“Task not Serializable”;那为什么在mapPartitions里面创建MD5实例,map引用这个对象,Spark却没有报“Task not Serializable”错误呢?

    老弟不妨再想一想,咱们评论区继续讨论~

    2021-09-19 17:34:38

  • Unknown element

    2021-09-24 21:41:26

    老师您回答一楼的问题时提到的序列化意思是不是对象在不同节点间传输的时候只能序列化为字符串传输?如果是的话那我觉得 在mapPartitions里面创建MD5实例,map引用这个对象,Spark却没有报“Task not Serializable”错误 是因为driver把这段代码分发到了各个executor,而创建对象这个工作是由executor完成的,所以不会报错?
    作者回复

    Perfect,正解!就是这么回事~

    2021-09-25 16:22:24

  • Geek_eb2d3d

    2021-09-28 00:54:32

    老师,我在 map 里面使用 SparkContext 或 SparkSession 创建新的 RDD,这样是可以的么?
    作者回复

    不可以,严禁这么做。

    RDD里面,禁止嵌套定义新的RDD,这个是Spark分布式开发的大忌。

    Spark的开发模式、或者是开发规范,是利用transformation,来完成分布式数据集之间的转换,从而达到处理数据的目的~

    RDD内嵌套RDD,这种属于单机思维哈,必须要跳出来~

    2021-09-29 09:20:57

  • Alvin-L

    2021-09-21 01:43:49

    Python版,虽然能跑,但是不知道对不对:
    #哈希值计数
    ```
    from hashlib import md5
    from pyspark import SparkContext


    def f(partition):
    m = md5()
    for word in partition:
    m.update(word.encode("utf8"))
    yield m.hexdigest()


    lineRDD = SparkContext().textFile("./wikiOfSpark.txt")
    kvRDD = (
    lineRDD.flatMap(lambda line: line.split(" "))
    .filter(lambda word: word != "")
    .mapPartitions(f)
    .map(lambda word: (word, 1))
    )

    kvRDD.foreach(print)

    ```

    #相邻+过滤特殊字符
    ```
    from pyspark import SparkContext

    # 定义特殊字符列表
    special_char_list = ["&", "|", "#", "^", "@"]
    # 定义判定函数f
    def f(s):
    words = s.split("-")
    b1 = words[0] in special_char_list
    b2 = words[1] in special_char_list
    return (not b1) and (not b2)


    # 定义拼接函数word_pair
    def word_pair(line):
    words = line.split(" ")
    for i in range(len(words) - 1):
    yield words[i] + "-" + words[i + 1]


    lineRDD = SparkContext().textFile("./wikiOfSpark.txt")
    cleanedPairRDD = lineRDD.flatMap(word_pair).filter(f)
    cleanedPairRDD.foreach(print)

    ```
    作者回复

    两份代码都是满分💯,👍

    感谢老弟分享、贡献Python代码,后面我们一起把它整理到GitHub里面去~

    感谢!!!

    2021-09-23 00:21:59

  • Geek_a30533

    2021-10-14 09:31:47

    对scala的函数定义格式不是很清楚,那边绕了好几次,有一个小疑问,在flatMap里的匿名函数f

    line => {
    val words: Array[String] = line.split(" ")
    for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
    }

    只定义了形参是line,那出参是整个花括号么?主要是没有return,让我一下迷了,难道是最后一个是Array[String]所以返回值就是这个?不用声明吗?
    作者回复

    对的~

    确实,Scala的语法比较简洁(偷懒),return不是必须的,如果函数体没有return,那么最后一行代码的输出,就是整个函数的返回值~

    具体到这个匿名函数,它的返回结果就是你说的Array[String],后续flatMap再把这里的Array展平

    2021-10-15 16:50:23

  • 子兮

    2021-09-26 23:05:41

    老师,请问,一个变量m被filter 算子内部计算时使用,若m定义在了filter算子所在函数内,也就是变量和算子在同一个函数里,就不会报序列化问题,若定义在了函数之外就会报序列化问题,这是为什么?看了一些解释,说是函数闭包,也不是很理解
    作者回复

    可以参考Unknown Element的回答~ m在filter外,是driver创建,需要走网络、序列化。而m在filter内,则是在executor创建,不需要从driver分发过去

    2021-09-28 20:10:12

  • 大叮当

    2021-09-16 14:58:16

    同问老师,AIK问的问题,什么时候用小括号什么时候用花括号啊,感觉scala实在有点过于灵活
    作者回复

    Scala语法确实比较灵活,一般来说,简单表达式用(),复杂表达式用{}。

    比如,简单函数体:(x => x + 1)、(_ + 1),等等;
    复杂函数体:{case x: String => “一大堆关于x的转换” }

    2021-09-16 22:59:43

  • Spoon

    2022-03-26 21:14:08

    最后filter更建议用Set这种数据结构进行contain查询,set的算法复杂度为O(1)
    本节内容Java版本实现
    https://github.com/Spoon94/spark-practice/blob/master/src/main/java/com/spoon/spark/core/TransformOpJob.java
  • 木之上

    2022-02-23 00:17:13

    老师,在学习这些算子的时候,像map,flatmap是否可以类比JAVA8的lambda表达式+stream流去学习
    作者回复

    是的,没有问题~ JAVA8开始支持函数式编程,它的stream本质上和咱们这里的算子是一样的~

    2022-02-24 11:23:52

  • DMY

    2021-10-20 16:18:27

    数据做map映射是以元素为粒度,执行f函数;
    这里业务场景中,f函数需要调用rpc,每个数据调一次rpc+数据量大就会非常耗时。
    所以想把一组数据打包成一个list减少rpc调用,来提高效率,这里要怎么处理呢
    作者回复

    可以参考下面的方式来实现:

    import scala.collection.mutable.ArrayBuffer

    def f(iter: Iterator[Int]) : Iterator[Int] = {
    val dataSeq: ArrayBuffer[Int] = new ArrayBuffer()
    while (iter.hasNext) {
    val cur = iter.next;
    dataSeq += cur
    }
    // 你的处理逻辑
    val res = yourRPCFunction(dataSeq)
    // 以迭代器的形式,返回计算结果
    res.iterator
    }

    // 处理RDD的时候,用mapPartitions调用f即可
    data.mapPartitions(f)

    2021-10-22 15:59:17

  • 孙浩

    2021-11-05 16:13:17

    有疑问,吴老师,PariedRDD中的(K,V),1.对应的数据类型应该是scala中的元组吧?2.reduceByKey为啥不支持元素是map类型?或者如果我存在一个RDD[Map[String,Int]],我想做reduceByKey操作,应该怎么实现?
    作者回复

    好问题,
    1)(K,V)可以理解成是元组,这个没有问题
    2)关于reduceByKey为啥不支持RDD[Map[String,Int]],这个其实老弟需要充分理解PairRDD,就RDD[Map[String,Int]]这种类型来说,本质上,他其实是RDD[KeyType],而不是RDD[(KeyType,ValueType)]。也就是说,RDD[Map[String,Int]]不是PairRDD,所以不仅reduceByKey不支持,其他*ByKey类操作也不支持。如果把它变换一下,比如RDD[(String,Map[String,Int])],这样是可以的,因为这个类型属于PairRDD,KeyType是String,而ValueType是Map[String,Int]~

    2021-11-09 13:17:14

  • Geek_390836

    2021-09-16 08:46:38

    参考map和mapPartitions,为什么mapPartitions中的map,是对record进行getbytes而不是word.getbytes操作,刚学spark,求老师解答
    作者回复

    好问题~

    你说的对,这里笔误了,已经让编辑帮忙改成了“word.getbytes”哈~

    感谢纠正~

    2021-09-16 23:16:48

  • 请输入昵称

    2022-04-25 20:56:18

    课后回答
    第一题:应用为按照分区索引做定制化操作
    第二题:共性为都是转换算子,并非行动算子
    第三题:之前没用过 mapPartitions 算子,也不知道,看来下代码,发现了可以很多用这个算子做优化的地方
  • 钱鹏 Allen

    2021-09-18 00:20:28

    mapPartitionsWithIndex 需要结合分区索引使用
    map filter flatMap mapPartitions mapPartitionsWithIndex 通过函数,传递参数,返回新的RDD
    mapPartitions 采集系统中,只需实例化一次电表号,可读其他各类读数
    作者回复

    正解~

    2021-09-19 19:35:21

  • jasonde

    2024-06-03 05:35:17

    老师,麻烦咨询一下用map, mappartition, 是否也可以实现flatmap 的结果呢,劳烦指导,谢谢了
  • Geek_919d2f

    2023-09-01 11:36:12

    为什么不可以把val md5 = MessageDigest.getInstance("MD5") 这一段,提取到方法外面,这样就可以不使用mapPartitions,不用再套一层逻辑了
  • 我爱夜来香

    2022-04-13 14:31:46

    老师,map算子针对一个个元素进行f操作,这个元素是咋定义的,一行?
  • 艾利特-G

    2022-01-03 21:42:46

    // 相邻词汇计数统计
    package com.shouneng

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    object NeighboringWordCount {
    def main(args: Array[String]) {
    try {
    val conf =
    new SparkConf().setAppName("NeighboringWordCount").setMaster("local[1]")
    val sparkContext = new SparkContext(conf)
    val rootPath: String = "file://"
    val file: String =
    s"${rootPath}/home/shouneng/geektime/Getting_Started_with_Spark/chapter01/wikiOfSpark.txt"

    // 读取文件内容
    val lineRDD: RDD[String] = sparkContext.textFile(file)
    // val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
    // val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

    val wordPairRDD: RDD[String] = lineRDD.flatMap(line => {
    val words: Array[String] =
    line.split(" ").filter(word => !word.equals(""))
    for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i + 1)
    })
    val kvRDD: RDD[(String, Int)] = wordPairRDD.map(wordPair => (wordPair, 1))
    val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

    wordCounts
    .map { case (k, v) => (v, k) }
    .sortByKey(false)
    .take(5)
    .foreach(println)
    } catch {
    case ex: Exception => {
    ex.printStackTrace() // 打印到标准err
    System.err.println("exception===>: ...") // 打印到标准err
    }
    }
    }
    }
    作者回复

    赞👍,感谢老弟的整理~

    2022-01-09 17:11:02

  • 孙浩

    2021-11-05 16:52:36

    吴老师,mapPartitions的是不是也避免了对象锁的问题,因为partitions对应的也是任务数。
    作者回复

    并没有哈,在运行时,map和mapPartitions都是以分区为粒度,创建分布式任务。换句话说,map和mapPartitions两者在运行时,Tasks的数量是一致的,所以并不存在你说的任务与任务之间对象锁的问题

    2021-11-09 13:20:36

  • 小强

    2021-10-24 13:07:34

    import org.apache.spark.rdd.RDD

    // 这里的下划线"_"是占位符,代表数据文件的根目录
    val rootPath: String = "/home/gxchen/Spark/01-wordCount"
    val file: String = s"${rootPath}/wikiOfSpark.txt"

    // 读取文件内容
    val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

    // 以行为单位做分词
    val wordPairRDD: RDD[String] = lineRDD.flatMap(line => {
    val words: Array[String] = line.split(" ")
    for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
    })



    // 定义特殊字符列表
    val list: List[String] = List("&", "|", "#", "^", "@")

    // 定义判定函数f
    def f(s: String): Boolean = {
    val words: Array[String] = s.split("-")
    val b1: Boolean = list.contains(words(0))
    val b2: Boolean = list.contains(words(1))
    return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
    }

    // 使用filter(f)对RDD进行过滤
    val cleanWordPairRDD: RDD[String] = wordPairRDD.filter(f)

    // 把RDD元素转换为(Key,Value)的形式
    val kvRDD: RDD[(String, Int)] = cleanWordPairRDD.map(wordPair => (wordPair, 1))
    // 按照单词做分组计数
    val wordPairCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

    // 打印词频最高的5个词汇
    wordPairCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)


    运行了以上代码后,报错。想了半天也没弄明白,求助一下!
    21/10/24 04:58:42 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 3)
    java.lang.ArrayIndexOutOfBoundsException: 1
    at $line33.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.f(<console>:29)
    at $line34.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$cleanWordPairRDD$1(<console>:27)

    21/10/24 04:58:42 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3) (localhost executor driver): java.lang.ArrayIndexOutOfBoundsException: 1

    21/10/24 04:58:42 ERROR TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job
    21/10/24 04:58:42 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)

    21/10/24 04:58:42 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (localhost executor driver): java.lang.ArrayIndexOutOfBoundsException: 0
    作者回复

    可以参考评论区“崔小豪”同学下面的回复,属于同一类问题:数组越界~

    2021-10-26 23:16:36