残月的技术日志

残月的技术日志

SparkRDD

2023-06-18

RDD

RDD(Resilient Distributed Dataset)即弹性分布式数据集,数据集的全部或部分可以存储在集群的多台机子的内存上,当内存不够时,数据也可持久化到硬盘

RDD的主要特征:

  • RDD都是只读的,但可以将RDD转换为新的RDD

  • RDD是可分区的,每个分区对应一个Tesk执行

  • 对RDD的操作,相对于对RDD某个分区操作

  • RDD拥有一系列的分区技术函数,称为算子

  • RDD之间存在依赖关系,可以实现管道化

转换算子负责对RDD中的数据进行计算并转换为一个新的RDD

Spark中所有算子都是惰性的,只有遇到行动算子才会一起执行

下面先使用Spark Shell操作

[root@master ~]# spark-shell
Spark context Web UI available at http://192.168.159.136:4041
Spark context available as 'sc' (master = local[*], app id = local-1665305112084).
Spark session available as 'spark'.
Welcome to
____              __
/ __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 2.1.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

创建RDD

通过对象创建

Spark可以通过parallelize()或makeRDD()将一个对象集合转换为RDD

scala> // 创建Scala集合

scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)

scala> val rdd = sc.makeRDD(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:26

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4)

通过本地文件创建

Spark可以通过textFile()读取本地文件系统文件,并按行拆分,转换为RDD

创建一个文本文件,在/root/text.txt

[root@master ~]# cat text.txt
hello scala
hello spark
hello bigData
scala> val rdd = sc.textFile("/root/text.txt")
rdd: org.apache.spark.rdd.RDD[String] = /root/text.txt MapPartitionsRDD[4] at textFile at <console>:24

scala> rdd.collect
res2: Array[String] = Array(hello scala, hello spark, hello bigData)

通过HDFS文件系统中文件创建

与前者类似,就在路径前加上hdfs://hostname:post就行

scala> val rdd = sc.textFile("hdfs://master:9000/text.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/text.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> rdd.collect
res6: Array[String] = Array(hello scala, hello spark, hello bigData)

RDD转换算子

map

Map算子可以对RDD中每个元素进行转换,如何作为结果RDD中对应位置元素的值

scala> val rdd = sc.makeRDD(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24

scala> val newRdd = rdd.map(_+1)
newRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at map at <console>:26

scala> newRdd.collect
res9: Array[Int] = Array(2, 3, 4, 5)

RDD是只读的,所以算子对RDD的修改不会影响源RDD,但会返回一个新的RDD

flatMap

与map类似,不过flatMap算子对每个元素的转换的结果可以是0、1或多个结果,然后将所有结果合并到一个RDD中

scala> val rdd = sc.makeRDD(List("a","b c","d"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at makeRDD at <console>:24

scala> val newRdd = rdd.flatMap(_.split(" "))
newRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at flatMap at <console>:26

scala> newRdd.collect
res10: Array[String] = Array(a, b, c, d)

filter

filter算子可以过滤掉一些元素,然后将剩下的元素放入一个新的RDD

scala> val rdd = sc.makeRDD(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:24

scala> val newRdd = rdd.filter(_>2)
newRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at filter at <console>:26

scala> newRdd.collect
res11: Array[Int] = Array(3, 4)

reduceByKey

reduceByKey只能用于(key,value)样式的scala一维长度为2元组,默认且只能指定第一个元素为key,第二个为value

将相同的key的元素进行聚合,value进行计算,然后放在一个新的RDD中返回,返回的类型也是(key,value)

例,相同key,value求和

scala> val rdd = sc.makeRDD(List(("A",12),("A",13),("B",1),("B",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at makeRDD at <console>:24

scala> val newRDD = rdd.reduceByKey((v1,v2) => v1 + v2)
newRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:26

scala> newRDD.collect
res14: Array[(String, Int)] = Array((A,25), (B,3))

groupByKey

类似前者,源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

将相同的key的元素聚合,value都丢在一个集合内,然后放在一个新的RDD中返回,返回的类型是(key,CompactBuffer(value.......))

scala> val rdd = sc.makeRDD(List(("A",12),("A",13),("B",1),("B",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24

scala> val newRDD = rdd.groupByKey()
newRDD: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at <console>:26

scala> newRDD.collect
res15: Array[(String, Iterable[Int])] = Array((A,CompactBuffer(12, 13)), (B,CompactBuffer(1, 2)))

union

用于将两个相同数据类型RDD进行合并,并放回一个合并后的RDD

scala> val rdd1 = sc.makeRDD(Array(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(4,5,6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[33] at union at <console>:28

scala> rdd3.collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6)

sortBy

sortBy算子可以将RDD内的元素进行排序,并返回新的RDD

sortBy第一个参数是排序函数,可以用于指定排序依据,第二个参数是布尔值,表示是否升序排序(即如果要降序,就false)

第二个参数默认为true(默认升序)

scala> val rdd = sc.makeRDD(Array(("xiaomin",75),("xiaohua",85),("xiaodong",83)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[57] at makeRDD at <console>:24

scala> val newRDD = rdd.sortBy(_._2)
newRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[62] at sortBy at <console>:26

scala> newRDD.collect
res21: Array[(String, Int)] = Array((xiaomin,75), (xiaodong,83), (xiaohua,85))

scala> newRDD = rdd.sortBy(_._2,false)
newRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[67] at sortBy at <console>:26

scala> newRDD.collect
res22: Array[(String, Int)] = Array((xiaohua,85), (xiaodong,83), (xiaomin,75))

sortByKey

源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

按照key进行排序

只有一个参数表示是否升序排序,默认true,即默认升序

scala> val rdd = sc.makeRDD(Array((2,"A"),(1,"B"),(3,"C")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[75] at makeRDD at <console>:24

scala> val newRDD = rdd.sortByKey(false)
newRDD: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[78] at sortByKey at <console>:26

scala> newRDD.collect
res25: Array[(Int, String)] = Array((3,C), (2,A), (1,B))

scala> val newRDD = rdd.sortByKey()
newRDD: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[81] at sortByKey at <console>:26

scala> newRDD.collect
res26: Array[(Int, String)] = Array((1,B), (2,A), (3,C))

join

源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

join算子做的是内连接,是将两个RDD工具key进行连接,并只返回两个RDD都匹配的内容

scala> val rdd1 = sc.makeRDD(Array(("A","a"),("B","b")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[83] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("B","b1"),("B","b2"),("C","c")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[84] at makeRDD at <console>:24

scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[87] at join at <console>:28

scala> rdd3.collect
res27: Array[(String, (String, String))] = Array((B,(b,b1)), (B,(b,b2)))

leftOuterJoin

源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

leftOuterJoin算子做的是左外连接,是将两个RDD工具key进行连接,左边RDD会都存在,右边RDD只保留匹配内容

scala> val rdd2 = sc.makeRDD(Array(("B","b1"),("B","b2"),("C","c")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[89] at makeRDD at <console>:24

scala> val rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (String, Option[String]))] = MapPartitionsRDD[92] at leftOuterJoin at <console>:28

scala> rdd3.collect
res28: Array[(String, (String, Option[String]))] = Array((A,(a,None)), (B,(b,Some(b1))), (B,(b,Some(b2))))

rightOuterJoin

源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

leftOuterJoin算子做的是右外连接,是将两个RDD工具key进行连接,左边RDD只保留匹配内容,右边RDD会都存在

scala> val rdd1 = sc.makeRDD(Array(("A","a"),("B","b")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[93] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("B","b1"),("B","b2"),("C","c")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[94] at makeRDD at <console>:24

scala> val rdd3 = rdd1.rightOuterJoin(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Option[String], String))] = MapPartitionsRDD[97] at rightOuterJoin at <console>:28

scala> rdd3.collect
res29: Array[(String, (Option[String], String))] = Array((B,(Some(b),b1)), (B,(Some(b),b2)), (C,(None,c)))

fullOuterJoin

源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

leftOuterJoin算子做的是全外连接,是将两个RDD工具key进行连接,两边RDD不管匹不匹配的元素会都存在

scala> val rdd1 = sc.makeRDD(Array(("A","a"),("B","b")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[98] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("B","b1"),("B","b2"),("C","c")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[99] at makeRDD at <console>:24

scala> val rdd3 = rdd1.fullOuterJoin(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Option[String], Option[String]))] = MapPartitionsRDD[102] at fullOuterJoin at <console>:28

scala> rdd3.collect
res30: Array[(String, (Option[String], Option[String]))] = Array((A,(Some(a),None)), (B,(Some(b),Some(b1))), (B,(Some(b),Some(b2))), (C,(None,Some(c))))

intersection

返回两个RDD元素的交集组成的新RDD

scala> val rdd1 = sc.makeRDD(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[105] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(3 to 7)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[106] at makeRDD at <console>:24

scala> rdd1.intersection(rdd2).collect
res32: Array[Int] = Array(4, 5, 3)

cogroup

对两个数据类型是(key,value)样式的scala一维长度为2元组的RDD先按照Key进行组合,然后工具key进行并集操作

一种合并,左边有的放左边,右边有的放右边,没有留空

scala> val rdd1 = sc.makeRDD(Array(("A","a"),("B","b")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[117] at makeRDD at <console>:24

scala> val rdd2 = sc.makeRDD(Array(("B","b1"),("B","b2"),("C","c")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[118] at makeRDD at <console>:24

scala> rdd1.cogroup(rdd2).collect
res34: Array[(String, (Iterable[String], Iterable[String]))] = Array((A,(CompactBuffer(a),CompactBuffer())), (B,(CompactBuffer(b),CompactBuffer(b1, b2))), (C,(CompactBuffer(),CompactBuffer(c))))

distinct

返回一个源RDD去重后的RDD

scala> val rdd1 = sc.makeRDD(Array(1,2,3,3,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[113] at makeRDD at <console>:24

scala> rdd1.distinct.collect
res33: Array[Int] = Array(4, 1, 5, 6, 2, 3)

注意,会乱序

RDD行动算子

Spark中对RDD的操作都惰性的,只有遇到行动算子才会触发计算

所以,在使用转换算子时出现的问题,可能不会立即显现

行动算子有一些这些

  1. reduce

  2. collect

  3. count

  4. first

  5. take

  6. takeOrdered

  7. aggregate

  8. fold

  9. countByKey

  10. save相关算子

  11. foreach

拿几个常用的算子举例

reduce

将RDD中元素进行聚合,直接返回聚合结果

scala> val rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd.reduce((a,b) => a+b)
res0: Int = 5050

行动算子返回不一定是RDD

count

统计并返回RDD中元素个数

scala> val rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24

scala> rdd.count
res1: Long = 100

countByKey

源RDD数据类型只能是(key,value)样式的scala一维长度为2元组

按照key分类,然后按照可以统计个数,返回scala.collection.Map

scala> val rdd = sc.makeRDD(Array(("A",1),("B",1),("A",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> rdd.countByKey
res2: scala.collection.Map[String,Long] = Map(A -> 2, B -> 1)

take(n)

返回RDD前n个元素组成的数组

scala> val rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:24

scala> rdd.take(10)
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

如果是取第一个,可以使用first算子

分区

RDD可以分为多个分区,各个分区放在各个节点上

分区数量

RDD中各个分区中的数据可以并行计算,Spark会给每个分区分配一个Task任进行计算

RDD通常默认分区数量为你的集群的CPU核心数

查看分区数量

我可以使用getNumPartitis查看分区数量

scala> val rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at <console>:24

scala> rdd.getNumPartitions
res9: Int = 4

getNumPartitis默认是使用当前设备的核心数;我是用了台4核心的虚拟机。可以看出,与核心数一致

指定分区数量

一般来说,在创建RDD的函数的第二个参数传入一个Int类型的数,就可以指定RDD分区数量

scala> val rdd = sc.makeRDD(1 to 100,20)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:24

scala> rdd.getNumPartitions
res10: Int = 20

分区数不受核心数限制

scala> val rdd = sc.textFile("/root/text.txt",1)
rdd: org.apache.spark.rdd.RDD[String] = /root/text.txt MapPartitionsRDD[14] at textFile at <console>:24

scala> rdd.getNumPartitions
res12: Int = 1

注意,textFile中第二个参数只是最小分区数量,具体Spark可能会按照文件大小等一些因素决定


所以,我一开始的话可能有点片面,实际上要看看编辑器的提示或源码,视情况而定

自定义分区器

分区规则是由分区控制器(Partitioner)控制,Spark的主要分区类是HashPartitioner和RangePartitioner,他们都继承自抽象类Partitioner,我们可以实现Partitioner类达到自定义分区控制器

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD

/**
 *  @author 20软件林泰圣
 */
object PartitionerTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("PartitionerTest")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val dataRDD:RDD[(String,(String,String))] = sc.makeRDD(
      Array(
        ("cat",("mimi","white")),
        ("cat",("tom","blue")),
        ("dog",("wancai","black"))
      )
    )

    //写到HDFS
    dataRDD
      .partitionBy(new MyPartitioner(3))   //自定义分区器
      .saveAsTextFile("hdfs://192.168.159.136:9000/output")    //保存到hdfs
  }
}

/**
 * 自定义分区器
 * @param partitionNum 分区数量
 */
class MyPartitioner(partitionNum:Int) extends Partitioner {
  //获得分区数量
  override def numPartitions: Int = partitionNum
  //获得分区ID,即那个分区,分区ID为Int类型
  override def getPartition(key: Any): Int = {
    val project = key.toString
    if(project.equals("cat")){          //猫:0
      0
    }else if(project.equals("dog")){    //狗:1
      1
    }else{                              //其他:2
      2
    }
  }
}
[root@master ~]# hadoop dfs -ls /output

Found 4 items
-rw-r--r--   3 ÁÖ̩ʥ supergroup          0 2022-10-10 01:18 /output/_SUCCESS
-rw-r--r--   3 ÁÖ̩ʥ supergroup         36 2022-10-10 01:18 /output/part-00000
-rw-r--r--   3 ÁÖ̩ʥ supergroup         21 2022-10-10 01:18 /output/part-00001
-rw-r--r--   3 ÁÖ̩ʥ supergroup          0 2022-10-10 01:18 /output/part-00002
[root@master ~]# hadoop dfs -cat /output/part-00000

(cat,(mimi,white))
(cat,(tom,blue))
[root@master ~]# hadoop dfs -cat /output/part-00001

(dog,(wancai,black))
[root@master ~]# hadoop dfs -cat /output/part-00002

RDD持久化

SparkRDD是懒加载的,只有遇到行动算子才会从头开始计算所有RDD,当遇到一个RDD被多次使用,就会严重影响性能,这时候可以通过RDD持久化避免重复计算

在RDD上进行persist()或cache()可以对RDD进行持久化;cache()底层调用persist(),不可更改级别

存储级别

Storage Level

Meaning

MEMORY_ONLY

将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,一些分区将不会被缓存,从而在每次需要这些分区时都需重新计算它们。这是系统默认的存储级别。

MEMORY_AND_DISK

将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次需要时读出它们。

MEMORY_ONLY_SER

将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作。

MEMORY_AND_DISK_SER

和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。

DISK_ONLY

仅仅将RDD分区存储到磁盘中

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

和上面的存储级别类似,但是复制每个分区到集群的两个节点上面

OFF_HEAP (experimental)

以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力。

持久化

scala> val rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at <console>:24

scala> rdd.cache
res13: rdd.type = ParallelCollectionRDD[16] at makeRDD at <console>:24

scala> rdd.persist
res14: rdd.type = ParallelCollectionRDD[16] at makeRDD at <console>:24

默认持久化到内存中,可切换等级调整到硬盘,具体看什上面的存储级别

Spark程序执行结束后,cache()与persist()中的内容会被清空

cache()与persist()操作也只能在遇到行动算子后才会执行持久化

RDD的持久化方法,同样适用于后面会出现的DataFrame以及DataSet

缓存

cache() 是persist的一种 准确说是 persist(StorageLevel.MEMORY_ONLY)。

RDD检查点

cache()与persist()中的内容会被清空,无法长期保存

检查点可以将RDD状态保存在硬盘中,在需要的时候又可以还原

scala> sc.setCheckpointDir("/root/checkpoint")

scala> sc.setCheckpointDir("/root/checkpoint")    //设置检查点路径

scala> val rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> rdd.checkpoint
[root@master ~]# ls -l checkpoint/
总用量 0
drwxr-xr-x 2 root root 6 10月 10 02:58 56326811-6331-4a7c-884b-fb09a2a15954
drwxr-xr-x 2 root root 6 10月 10 02:58 d2a3592b-c36b-4a57-8488-626da9b9d425

检查点只有在遇到行动算子后才会保存

当下次行动算子计算时,将直接调用检查点数据,不需要从头计算

共享变量

通常情况下(集群),Spark应用程序会将个算子中的函数分配到多个Worker节点运行,若一个算子使用了某个外部变量,那该变量就会被复制到该Worker节点上的每个Task任务中。由于各个Task相互独立,当该变量存储的数据非常大时(例如存储了100M的SCV文件的源数据),那就回导致网络传输以及内存开销明显加大,因此可能会导致些许性能问题

Spark提供了两种共享变量,广播变量以及累加器,学过Flink这些应该不会陌生

默认的变量传递

scala> val arr = Array(1,2,3,4,5,6)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd.map(x => (x,arr)).collect
res4: Array[(String, Array[String])] = Array((A,Array(A, B, C, D)), (B,Array(A, B, C, D)), (C,Array(A, B, C, D)), (D,Array(A, B, C, D)))

这是个简单的案例,有一个存放在Driver的外部变量arr,arr被用在map算子中,arr将被发送给每个Task

但要是arr是一个100M的数据,那每个Task去维护100M大小的数据副本,要是某个Executor启动了4个Task,那就共有400M是数据副本,消耗内存不说,还有带来较大的网络开销

广播变量

广播变量是一个在每个Worder节点的一个只读缓存,该变量无论你有几个Task都只发送一次,每个Worder节点也只存在一个广播变量

scala> val broadcastArr = sc.broadcast(arr)
broadcastArr: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(2)

scala> rdd.map(x => (x,broadcastArr)).collect
res5: Array[(String, org.apache.spark.broadcast.Broadcast[Array[Int]])] = Array((A,Broadcast(2)), (B,Broadcast(2)), (C,Broadcast(2)), (D,Broadcast(2)))

可以发现,RDD中每个元素使用org.apache.spark.broadcast.Broadcast对象,它只在执行行动算子后复制一次到各个Worder节点,不管你Executor启动了几个Task,这个Worder节点只有一个变量

Broadcast对象只是一个简单的封装,你可以使用.value方法得到里面的值

scala> broadcastArr.value
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6)

累加器

广播变量是将Driver中的变量广播并缓存到各个Worker节点,那累加器就是可以让Worker访问Driver中的特殊变量

若不是用累加器。由于Driver中的变量是复制到各个Worder的各个Task中,那就有一个问题,在Task中的修改,不影响Driver的外部变量

如下:

scala> var sum = 0
sum: Int = 0

scala> val rdd = sc.makeRDD(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24

scala> rdd.foreach(x => sum += x)

scala> print(sum)
0

Driver中sum值未被更改,更改的仅仅是各个Task中被复制过去的sum

这时候可以使用累加器,累加器使用add()方法累加,使用value()方法取值

scala> val myAcc = sc.longAccumulator("myAcc")
myAcc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 480, name: Some(myAcc), value: 0)

scala> val rdd = sc.makeRDD(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at <console>:24

scala> rdd.foreach(x => myAcc.add(x))

scala> print(myAcc.value)
15

扩充样例

SparkRDD二次排序

在项目目录下,存在一个文件sort.txt,内容如下

4 5
2 6
4 6
3 5
1 5
8 6
4 7

要求,每行按照第一个数字升序排序,当第一个数字相同时,按照第二个数字降序排序

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 20软件林泰圣
 */
object Sort01 {

  class SecondSortKey(val firstNum:Int,val secondNum:Int) extends Ordered[SecondSortKey] with Serializable {
    override def compare(that: SecondSortKey): Int = {
      //判断第一个数字是否相同
      if(this.firstNum != that.firstNum){
        this.firstNum - that.firstNum
      }else{
        that.secondNum - this.secondNum
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("Sort01")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val pairRDD:RDD[(SecondSortKey,String)] = sc
      .textFile("./sort.txt")
      .map(d => {
        (new SecondSortKey(
          d.split(" ")(0).toInt,
          d.split(" ")(1).toInt
        ),d)
      })

    val sortRDD = pairRDD
      .sortByKey()              //将按照SecondSortKey排序
      .map(_._2)

    sortRDD.collect().foreach(d => {
      println(d)
    })
  }
}
1 5
2 6
3 5
4 7
4 6
4 5
8 6

SparkRDD数据倾斜问题解决

案例来自《Spark大数据分析实战》

如果一个Spark作业有两个Stage(),Stage2依赖与Stage1时,Stage2必须要等另一个Stage1完成时才能往下做

这时要是Stage1的任务量非常庞大,假如要1小时,Stage2只需要1秒,这个任务分配不均的现象就叫数据倾斜

数据倾斜会影响Spark应用程序的执行效率以及资源利用率

解决数据倾斜的几个方法

  1. 数据预处理

    假设Spark数据都来自Hive或MySQL,那可以先在上面对数据进行预处理,尽量保证数据均匀,或者是先对数据进行一次聚合,在传入Spark时就不要那么多次的reduceByKey()操作,就能减少Shuffle操作,缓解数据倾斜

  2. 过滤掉导致数据倾斜的key

    要是导致数据倾斜的key本身无意义,本身不参与计算或对结果无影响,那可以讲该key过滤掉

  3. 提高Shuffle的并行度

    Spark RDD的Shuffer过程与MapReduce类似,会涉及数据重组和从分区,如果并行度设置不合适,那可能会导致多个Key被分配到一个分区,使得某一Task任务过大,影响性能

    在使用聚合算子(xxxByKey相关)时,可以通过参数传入并行度,给原先分多个key的Task的任务分配到多个Task上,缓解问题

  4. 通过随机Key前缀进行双重聚合

    在相同key中加上随机前缀,使得相同key被拆分到不同的key,就可以让原先分配在一个分区的key分配到多个分区,从而分配到多个key

案例

有一数据文件,存放在hdfs://master:9000/test/word.txt

[root@master ~]# hadoop dfs -cat /test/word.txt
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello spark hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello spark hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello scala hello hello hello hello hello hello hello hadoop hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello spark hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello

数据中hello量过大,若不处理,再进行聚合操作时,将发生数据倾斜

这时候我们使用随机Key前缀的方法环境数据倾斜问题,通过随机的key,能将量大的Hello分配给多个Stage,这样就避免一个Stage工作量过大的问题

import org.apache.spark.sql.SparkSession

import scala.util.Random

object DataLean {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("DataLean")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext

    val wordRDD = sc.textFile("hdfs://master:9000/test/word.txt")

    val result = wordRDD
      .flatMap(_.split(" "))
      .map(x => {                          //添加前缀
        val random = Random.nextInt(10)    //0-9随机数
        (random + "_" + x ,1)
      })
      .reduceByKey((a,b) => {a + b})
      .map(x => {                          //去除前缀
        val word = x._1.split("_")(1)
        val count = x._2
        (word, count)
      })
      .reduceByKey((a,b) => {a + b})       //总聚合

    println(result.collect().mkString("Array(", ", ", ")"))

  }
}
Array((scala,1), (hello,275), (spark,3), (hadoop,1))

  • 0