import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 简单算子演示
  */
object FunctionDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("CreateRDD").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,3,7,8,9))

    //1.map:对RDD中每一个元素进行遍历并加以计算,返回一个全新RDD
    val rdd2: RDD[Int] = rdd.map(_ * 2)
    println(rdd2.collect().toBuffer)

    //2.filter:对RDD中每一个元素执行Boolean类型表达式,结果为ture 值反值存储到新的RDD中
    val rdd3: RDD[Int] = rdd2.filter(_ > 10)
    println(rdd3.collect().toBuffer)

    //3.flatMap:对RDD中存在集合进行压平处理,将集合内部的数据取出存储到一个全新的RDD中
    val rdd4 = sc.parallelize(Array("a b c","b c d"))
    val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
    println(rdd5.collect().toBuffer)

    //4.sample:随机抽样
    //抽样只能在一个范围内返回 ,但是范围会有一定的波动
    //参数说明
    /*
      withReplacement: Boolean, 表示抽取出数据是否返回原有样例中  true这个值会被放回抽样中 false 不会放回
      fraction: Double,  抽样比例 即 抽取30%  写入值就是 0.3(本身Double就是一个不精确数据)
      seed: Long = Utils.random.nextLong 种子, 随机获取数据的方式 ,默认不传
     */
    val rdd5_1 = sc.parallelize(1 to 10)
    val sample: RDD[Int] = rdd5_1.sample(false,0.3)
    println(sample.collect().toBuffer)

    //5.union:并集
    val rdd6 = sc.parallelize(List(5,6,7,8))
    val rdd7 = sc.parallelize(List(1,2,5,6))
    val rdd8 = rdd6  union rdd7
    println( rdd8.collect.toBuffer)

    //6.intersection:求交集
    val rdd9: RDD[Int] = rdd6 intersection rdd7
    println( rdd9.collect.toBuffer)

    //7.distinct:去重复
    println(rdd8.distinct.collect.toBuffer)

    //8.join:相同key才会被合并,没有相同的key将被舍弃掉
    val rdd10_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
    val rdd10_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
    val rdd10: RDD[(String, (Int, Int))] = rdd10_1 join rdd10_2
    println(rdd10.collect().toBuffer)

    //9.LeftOuterJoin/rightOuterJoin:左连接/右连接
    //无论是左连接还是右连接,除了基本值外 ,剩余值的数据类型是Option类型
    val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2
    val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2
    println(rdd10_4.collect.toList)
    println(rdd10_5.collect.toList)

    //10.cartesian:笛卡尔积
    val rdd11_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
    val rdd11_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
    val rdd11_3 = rdd11_1 cartesian rdd11_2
    println(rdd11_3.collect.toList)

    //11.分组
    val rdd11_4= sc.parallelize(List(("tom",1),("jerry",3),("kitty",2),("tom",2)))
    //11.1 根据传入的参数进行分组
    val rdd11_5: RDD[(String, Iterable[(String, Int)])] = rdd11_4.groupBy(_._1)
    //11.2 根据key进行分区(对KV形式是使用) -->除了指定分组之后分区的数量之外, 还可以使用自定义分区器
    val rdd11_6: RDD[(String, Iterable[Int])] = rdd11_4.groupByKey()
    //11.3 cogroup根据key进行分组(分组必须是一个对偶元组)
    val rdd11_7: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2
    println(rdd11_7.collect.toBuffer)

    /*
  ps:当前方法和groupByKey都可以对数据进行分组,但是,groupByKey会将相同key的值(value)存储在一起(一个集合)
  cogroup  参数是另外一个要合并分组的RDD(必须是对偶元组),根据相同key进行额分组,但是value不会存在一个集合中
 */


  }

}

  

12-26 23:59