admin 管理员组

文章数量: 1184232

1)coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

作用
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

val ListRDD: RDD[Int]= context.makeRDD(Range(1,15,2),4)
println("缩减分区前: ")
ListRDD.glom.collect.foreach(data => println(data.mkString(",")))val coalesceRDD: RDD[Int]= ListRDD.coalesce(3)
println("缩减分区后: ")
coalesceRDD.glom.collect.foreach(data => println(data.mkString(",")))

输出:
缩减分区前:
1
3,5
7,9
11,13
缩减分区后:
1
3,5
7,9,11,13

将coalesce参数设置为2:
缩减分区前:
1
3,5
7,9
11,13
缩减分区后:
1,3,5
7,9,11,13

coalesce进行收缩合并分区,减少分区的个数,并没有shuffle操作,但这块也有隐患,数据倾斜是一个问题.

  1. repartition
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
    } 不同于coalesce算子的是,repartition算子必须进行shuffle操作

    将上述coalesce改成repartition算子
    缩减分区前:
    1
    3,5
    7,9
    11,13
    缩减分区后:
    5,13
    1,7
    3,9,11
    参数3改成2

    缩减分区前:
    1
    3,5
    7,9
    11,13
    缩减分区后:
    1,5,7,11
    3,9,13

    比之coalesce算子,shuffle操作导致效率更低了,但是数据倾斜好点,开发中避免shuffle操作更好,提升效率,数据倾斜有其他的处理方式
  2. union
    1.作用: 对源RDD和参数RDD求并集后返回一个新的RDD
    2.需求: 创建两个RDD,求并集
val ListRDD: RDD[Int]= context.makeRDD(1 to 10,4)val ListRDD2: RDD[Int]= context.makeRDD(5 to 12)val unionRDD: RDD[Int]= ListRDD.union(ListRDD2)
    unionRDD.collect.foreach(data => print(data +" "))

输出:1 2 3 4 5 6 7 8 9 10 5 6 7

  1. subtract
    1.作用: 计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
    2.需求: 创建两个RDD,求第一个RDD与第二个RDD的差集
    将上述函数union改成subtract

    输出:4 8 1 9 2 10 3

5).cartesian
1.作用: 笛卡尔积(尽量避免使用)
2.创建两个RDD,计算两个RDD的笛卡尔积

val ListRDD: RDD[Int]= context.makeRDD(1 to 3,4)val ListRDD2: RDD[Int]= context.makeRDD(5 to 7)val cartesianRDD: RDD[(Int,Int)]= ListRDD.cartesian(ListRDD2)
    cartesianRDD.collect.foreach(data => print(data +" "))

输出:(1,5) (1,6) (1,7) (2,5) (2,6) (2,7) (3,5) (3,6) (3,7)
6) zip
1.作用将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常.
2.需求:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)的RDD

val ListRDD: RDD[Int]= context.makeRDD(1 to 3,2)val ListRDD2: RDD[Int]= context.makeRDD(5 to 7,2)val cartesianRDD: RDD[(Int,Int)]= ListRDD.zip(ListRDD2)
    cartesianRDD.collect.foreach(data => print(data +" "))

7)partitionBy案例
1.作用:对partitionRDD进行分区操作,如果原有的partitionRDD和现有的partition是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程.
2.需求:创建一个4个分区的RDD,对其重新分区

val ListRDD: RDD[(Int,String)]= context.makeRDD(Array((1,"asp"),(2,"scala"),(3,"spark"),(4,"Python")),4)val HashRDD: RDD[(Int,String)]= ListRDD.partitionBy(new org.apache.spark.HashPartitioner(2))
    HashRDD.mapPartitionsWithIndex {case(nums,datas)=> datas.map((_," 分区:"+nums))}.collect.foreach(println)

输出:
((2,scala), 分区:0)
((4,Python), 分区:0)
((1,asp), 分区:1)
((3,spark), 分区:1)
底层实现:

class HashPartitioner(partitions:Int)extends Partitioner {
  require(partitions >=0, s"Number of partitions ($partitions) cannot be negative.")def numPartitions:Int= partitions
  def getPartition(key:Any):Int= key match{casenull=>0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)}
def nonNegativeMod(x:Int, mod:Int):Int={val rawMod = x % mod
    rawMod +(if(rawMod <0) mod else0)}

如果获取到的key是null,则被分到0分区
其他key调用nonNegativeMod函数(x:Int,mod:Int) x指的是当前key的hashCode值,用x 取模 分区数,这个余数就是新的分区号

利用这一点我们可以自定义分区器
8)自定义分区器
val ListRDD: RDD[(String, Any)] = context.makeRDD(Array((“Python”, “人生苦短”), (“Scala”, 3), (“Spark”, “内存计算”), (“Hadoop”, “大数据存储”), ("", “没有Key”)), 4)

val MypartitionRDD: RDD[(String,Any)]= ListRDD.partitionBy(new Mypartition(3))
    MypartitionRDD.mapPartitionsWithIndex {case(nums, datas)=> datas.map((_," 分区:"+ nums))}.collect.foreach(println)}}class Mypartition(partition:Int)extends Partitioner {overridedef numPartitions:Int={
    partition
  }overridedef getPartition(key:Any):Int= key match{case key if key.toString.contains("a")=>0//key中包含字符a 的分到0分区case key if("").equals(key.toString)=>1//key中是null 的分到1分区case _ =>2//其余分到2分区}}

输出:
((Scala,3), 分区:0)
((Spark,内存计算), 分区:0)
((Hadoop,大数据存储), 分区:0)
((,没有Key), 分区:1)
((Python,人生苦短), 分区:2)
常用代码以省略。

本文标签: 编程 分区 缩减分区