环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依赖jdk1.8)
  spark-1.6

一、RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
RDD特性:
(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

创建RDD:

【Spark-core学习之二】 RDD和算子-LMLPHP

【Spark-core学习之二】 RDD和算子-LMLPHP

二、Spark任务执行原理

【Spark-core学习之二】 RDD和算子-LMLPHP

以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
Driver与集群节点之间有频繁的通信。
Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了,会造成oom。
Worker是Standalone资源调度框架里面资源管理的从节点,也是JVM进程。
Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

三、Spark代码流程
1.创建SparkConf对象
可以设置Application name。
可以设置运行模式及资源需求。
2.创建SparkContext对象
3.基于Spark的上下文创建一个RDD,对RDD进行处理。
4.应用程序中要有Action类算子来触发Transformation类算子执行。
5.关闭Spark上下文对象SparkContext。

四、Transformations转换算子
Transformations 转换算子是延迟执行,也叫懒加载执行。
filter:过滤符合条件的记录数,true保留,false过滤掉。
map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。
flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
reduceByKey:将相同的Key根据相应的逻辑进行处理。
sortByKey/sortBy:作用在K,V格式的RDD上,对key进行升序或者降序排序。

【Spark-core学习之二】 RDD和算子-LMLPHP

JAVA示例:

package com.wjy.wc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction; import com.google.common.base.Optional; import scala.Tuple2; public class TransFormation { public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("TransFormationTest");
JavaSparkContext sc = new JavaSparkContext(conf); //1、创建RDD 查看默认分区数,第二个参数指定分区数 未指定 默认1 分区数决定task数
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","a","b","b","b","c","d"),4);
System.out.println("rdd默认分区数:"+rdd.partitions().size()); //java创建K-V RDD使用parallelizePairs 不要使用parallelize
List<Tuple2<String, String>> list1 = Arrays.asList(
new Tuple2<String,String>("zhangsan","a"),
new Tuple2<String,String>("lisi","b"),
new Tuple2<String,String>("wangwu","c"),
new Tuple2<String,String>("zhaoliu","d")
);
List<Tuple2<String, Integer>> list2 = Arrays.asList(
new Tuple2<String,Integer>("zhangsan",1),
new Tuple2<String,Integer>("lisi",2),
new Tuple2<String,Integer>("wangwu",3),
new Tuple2<String,Integer>("niuer",4)
);
List<Tuple2<String, String>> list3 = Arrays.asList(
new Tuple2<String,String>("zhangsan","100"),
new Tuple2<String,String>("lisi","200"),
new Tuple2<String,String>("wangwu","c"),
new Tuple2<String,String>("niuer","400")
);
//返回是JavaPairRDD 不是JavaRDD
JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(list1,2);
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(list2,3);
JavaPairRDD<String, String> rdd3 = sc.parallelizePairs(list3,6); //2、join 必须作用在K-V格式RDD上 按照两个RDD的key去关联 只有相同的key才会被关联出来
// join后的分区数与父RDD分区数多的那一个相同
JavaPairRDD<String, Tuple2<String, Integer>> join = rdd1.join(rdd2);
join.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<String, Integer>> t) throws Exception {
System.out.println(t);
}
}); //3、leftOuterJoin 左连接 以左边的key为主 左边的key都会出现 返回的仍然是K-V RDD
JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftOuterJoin = rdd1.leftOuterJoin(rdd2);
leftOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Optional<Integer>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<String, Optional<Integer>>> arg0) throws Exception {
System.out.println(arg0);
//Optional 两个子类:some和none
//of有值 absent无值
/*
* (zhangsan,(a,Optional.of(1)))
(wangwu,(c,Optional.of(3)))
(zhaoliu,(d,Optional.absent()))
(lisi,(b,Optional.of(2)))
*/
String key = arg0._1;
String value1 = arg0._2._1;
Optional<Integer> optional = arg0._2._2;
if (optional.isPresent())//isPresent()判断是否有值 true有值 false无值
{
System.out.println("key="+key+",value1="+value1+",value2="+optional.get());
}
else
{
System.out.println("key="+key+",value1="+value1);
} }
}); //4、rightOuterJoin右连接 以右边的key为主 右边的key都会出现 返回的仍然是K-V RDD
JavaPairRDD<String, Tuple2<Optional<String>, Integer>> rightOuterJoin = rdd1.rightOuterJoin(rdd2);
rightOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<Optional<String>,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<Optional<String>, Integer>> arg0) throws Exception {
System.out.println(arg0);
//of代表有值 absent代表无值
/*
* (zhangsan,(Optional.of(a),1))
(wangwu,(Optional.of(c),3))
(niuer,(Optional.absent(),4))
(lisi,(Optional.of(b),2))
*/
String key = arg0._1;
Integer value2 = arg0._2._2;
Optional<String> optional = arg0._2._1;
if (optional.isPresent())//isPresent()判断是否有值 true有值 false无值
{
System.out.println("key="+key+",value1="+optional.get()+",value2="+value2);
}
else
{
System.out.println("key="+key+",value2="+value2);
}
}
}); //5、fullOuterJoin 全连接 两边的key都会取到
JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullOuterJoin = rdd1.fullOuterJoin(rdd2);
fullOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<Optional<String>,Optional<Integer>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>> arg0) throws Exception {
System.out.println(arg0);
}
}); //6、union 合并两个数据集。两个数据集的类型要一致。  返回新的RDD的分区数是合并RDD分区数的总和
JavaPairRDD<String, String> union = rdd1.union(rdd3);
System.out.println("union分区数:"+union.partitions().size());
union.foreach(new VoidFunction<Tuple2<String,String>>() {
private static final long serialVersionUID = 1L; @Override
public void call(Tuple2<String, String> arg0) throws Exception {
System.out.println(arg0);
}
}); //7、intersection 取两个数据集的交集 KV完全相同
JavaPairRDD<String, String> intersection = rdd1.intersection(rdd3);
intersection.foreach(new VoidFunction<Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, String> arg0) throws Exception {
System.out.println(arg0);
}
}); //8、subtract 取两个数据集的差集
JavaPairRDD<String, String> subtract = rdd1.subtract(rdd3);
subtract.foreach(new VoidFunction<Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, String> arg0) throws Exception {
System.out.println(arg0);
}
}); JavaPairRDD<String, String> rdd11 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String,String>("zhangsan","aaa"),
new Tuple2<String,String>("zhangsan","bbb"),
new Tuple2<String,String>("zhangsan","ccc"),
new Tuple2<String,String>("lisi","100"),
new Tuple2<String,String>("lisi","200"),
new Tuple2<String,String>("lisi","300"),
new Tuple2<String,String>("wangwu","c11"),
new Tuple2<String,String>("wangwu","c33"),
new Tuple2<String,String>("zhaoliu","d")
));
JavaPairRDD<String, String> rdd22 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String,String>("zhangsan","1"),
new Tuple2<String,String>("zhangsan","222"),
new Tuple2<String,String>("zhangsan","333"),
new Tuple2<String,String>("lisi","ddd"),
new Tuple2<String,String>("lisi","eee"),
new Tuple2<String,String>("lisi","ttt"),
new Tuple2<String,String>("niuer","ffff")
)); //9、cogroup当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
//将两个rdd中相同key对应的value各自组成一个集合
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> cogroup = rdd11.cogroup(rdd22);
cogroup.foreach(new VoidFunction<Tuple2<String,Tuple2<Iterable<String>,Iterable<String>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> arg0) throws Exception {
System.out.println(arg0);
/*
* (zhangsan,([aaa, bbb, ccc],[1, 222, 333]))
(wangwu,([c11, c33],[]))
(niuer,([],[ffff]))
(zhaoliu,([d],[]))
(lisi,([100, 200, 300],[ddd, eee, ttt]))
*/
}
}); //10、mapPartition 与map类似,遍历的单位是每个partition上的数据 map是一个个处理。
//类型 foreachPartition 遍历的是每个partition上的数据 foreach则是一个个遍历
JavaRDD<String> rdd4 = sc.parallelize(Arrays.asList("a","b","c","d","e","f","g"),3);
rdd4.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Iterator<String> iter) throws Exception {
List<String> list = new ArrayList<String>();
System.out.println("begin");
while(iter.hasNext())
{
String s = iter.next();
System.out.println(s);
list.add(s);
}
System.out.println("end");
return list;
}
}).collect(); rdd4.foreachPartition(new VoidFunction<Iterator<String>>() {
private static final long serialVersionUID = 1L; @Override
public void call(Iterator<String> iter) throws Exception {
System.out.println("开始");
while(iter.hasNext())
{
System.out.println(iter.next());
}
System.out.println("结束");
}
});
//11、distinct(map+reduceByKey+map) reduceByKey有分组功能
JavaRDD<String> rdd5 = sc.parallelize(Arrays.asList("a","a","b","b","b","c","d"),4);
JavaRDD<String> distinct = rdd5.distinct();
distinct.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String arg0) throws Exception {
System.out.println(arg0);
}
}); //12、mapPartitionWithIndex 类似于mapPartitions,
//会将RDD中的partition索引下标带出来,index 是每个partition的索引下标
JavaRDD<String> rdd6 = sc.parallelize(Arrays.asList(
"love1","love2","love3","love4",
"love5","love6","love7","love8",
"love9","love10","love11","love12"
),3); //Function2 第一个参数是分区索引 第二个是这个分区值的list集合 第三个是经过call处理之后的返回值
JavaRDD<String> mapPartitionsWithIndex = rdd6.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
private static final long serialVersionUID = 1L;
@Override
//入参iter 是每个分区的集合的迭代器
public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
List<String> list = new ArrayList<String>(); while(iter.hasNext()) {
String one = iter.next();
list.add("rdd1 partition index = 【"+index+"】, value = 【"+one+"】");
}
return list.iterator();
}
}, true);
mapPartitionsWithIndex.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L; @Override
public void call(String v) throws Exception {
System.out.println(v);
}
});;
//13、repartition 增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle)
//repartition 是有shuffle的算子,可以对RDD重新分区。可以增加分区,也可以减少分区。
//repartition = coalesce(numPartitions,true)
JavaRDD<String> repartition = mapPartitionsWithIndex.repartition(2);
repartition.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String v) throws Exception {
System.out.println(v);
}
}); //14、coalesce coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
//coalesce 与repartition一样,可以对RDD进行分区,可以增多分区,也可以减少分区。
// coalsece(numPartitions,shuffle [Boolean = false]) false不产生shuffle 窄依赖 true就生成宽依赖 产生shuffle
//spark有空分区的概念 但是往多了分区不会产生空分区
//true为产生shuffle,false不产生shuffle。默认是false。
//如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)
JavaRDD<String> coalesce = mapPartitionsWithIndex.coalesce(4, false);
coalesce.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String v) throws Exception {
System.out.println(v);
}
}); //15、groupByKey 作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。
JavaPairRDD<String, Integer> rdd7 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String,Integer>("zhangsan",100),
new Tuple2<String,Integer>("zhangsan",200),
new Tuple2<String,Integer>("lisi",300),
new Tuple2<String,Integer>("lisi",400),
new Tuple2<String,Integer>("wangwu",500),
new Tuple2<String,Integer>("wangwu",600)
),2);
JavaPairRDD<String, Iterable<Integer>> groupByKey = rdd7.groupByKey();
groupByKey.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> v) throws Exception {
System.out.println(v);
/*
* (zhangsan,[100, 200])
(wangwu,[500, 600])
(lisi,[300, 400])
*/
}
});
//16、zip 将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。
JavaPairRDD<String, String> rdd8 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String,String>("zhangsan","18"),
new Tuple2<String,String>("zhangsan","18"),
new Tuple2<String,String>("lisi","19"),
new Tuple2<String,String>("lisi","190"),
new Tuple2<String,String>("wangwu","100"),
new Tuple2<String,String>("wangwu","200")
),2);
JavaPairRDD<Tuple2<String, Integer>, Tuple2<String, String>> zip = rdd7.zip(rdd8);
zip.foreach(new VoidFunction<Tuple2<Tuple2<String,Integer>,Tuple2<String,String>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Tuple2<String, Integer>, Tuple2<String, String>> v) throws Exception {
System.out.println(v);
/*
* ((zhangsan,100),(zhangsan,18))
((zhangsan,200),(zhangsan,18))
((lisi,300),(lisi,19))
((lisi,400),(lisi,190))
((wangwu,500),(wangwu,100))
((wangwu,600),(wangwu,200))
*/
}
}); //17、zipWithIndex 该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
JavaPairRDD<Tuple2<String, Integer>, Long> zipWithIndex = rdd7.zipWithIndex();
zipWithIndex.foreach(new VoidFunction<Tuple2<Tuple2<String,Integer>,Long>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Tuple2<String, Integer>, Long> v) throws Exception {
System.out.println(v);
/*
* ((zhangsan,100),0)
((zhangsan,200),1)
((lisi,300),2)
((lisi,400),3)
((wangwu,500),4)
((wangwu,600),5)
*/
}
}); //Action 算子
//18、reduce 对RDD中的每个元素 使用传递的逻辑去处理
JavaRDD<Integer> rdd9 = sc.parallelize(Arrays.asList(1,2,3,4,5));
Integer reduce = rdd9.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
//将相邻两个元素相加返回
return v1+v2;
}
});
System.out.println(reduce);//15 //19、countByKey() 对RDD中相同的key的元素计数
Map<String, Object> countByKey = rdd8.countByKey();
Set<Entry<String, Object>> entrySet = countByKey.entrySet();
for(Entry<String, Object> entry :entrySet) {
String key = entry.getKey();
Object value = entry.getValue();
System.out.println("key = "+key+",value ="+value);
} //20、countByValue() 对RDD中相同的元素计数
//根据数据集每个元素相同的内容来计数,不是针对K-V里的V,而是整个K-V。返回相同内容的元素对应的条数
Map<Tuple2<String, String>, Long> countByValue = rdd8.countByValue();
Set<Entry<Tuple2<String, String>, Long>> entrySet1 = countByValue.entrySet();
for(Entry<Tuple2<String, String>, Long> entry :entrySet1) {
Tuple2<String, String> key = entry.getKey();
Long value = entry.getValue();
System.out.println("key = "+key+",value ="+value);
/*
* key = (lisi,19),value =1
key = (wangwu,100),value =1
key = (zhangsan,18),value =2
key = (lisi,190),value =1
key = (wangwu,200),value =1
*/
} //close()方法里调用了stop
sc.close();
//sc.stop();
}
}

Scala示例:

package com.wjy

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer object TransFormation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("TransFormationTest");
val sc = new SparkContext(conf);
//1、创建RDD 查看默认分区数,第二个参数指定分区数 未指定 默认1 分区数决定task数
val rdd = sc.parallelize(Array("a","a","b","b","b","c","d"), 3);
//length 或size
println("分区数:"+rdd.partitions.size);
//创建RDD方式2
val rdd2 = sc.makeRDD(Array("a","b","b","b","c","d"), 2);
//创建K-V格式RDD
val rdd3 = sc.makeRDD(Array(("张三","v1"),("王五","v2"),("李四","v3"),("牛二","v4")));
val rdd4 = sc.makeRDD(List(("张三",""),("赵六",""),("李四","v3"),("王二","")));
//2、join
rdd3.join(rdd4).foreach(println);
/*
* (张三,(v1,111))
(李四,(v3,4534))
*/ //3、leftOuterJoin
rdd3.leftOuterJoin(rdd4).foreach(println);
/*
* (张三,(v1,Some(111)))
(李四,(v3,Some(4534)))
(王五,(v2,None))
(牛二,(v4,None))
*/ //4、rightOuterJoin
rdd3.rightOuterJoin(rdd4).foreach(println);
/*
* (张三,(Some(v1),111))
(王二,(None,677))
(李四,(Some(v3),4534))
(赵六,(None,4444))
*/ //5、fullOuterJoin
rdd3.fullOuterJoin(rdd4).foreach(println);
/*
* (张三,(Some(v1),Some(111)))
(王二,(None,Some(677)))
(李四,(Some(v3),Some(4534)))
(赵六,(None,Some(4444)))
(王五,(Some(v2),None))
(牛二,(Some(v4),None))
*/ //6、union
rdd3.union(rdd4).foreach(println);
/*
* (张三,v1)
(王五,v2)
(李四,v3)
(牛二,v4)
(张三,111)
(赵六,4444)
(李四,4534)
(王二,677)
*/
//7、交集intersection
rdd3.intersection(rdd4).foreach(println);
//8、差集subtract
rdd3.subtract(rdd4).foreach(println);
//9、cogroup
rdd3.cogroup(rdd4).foreach(println);
//10、mapPartition true表示与之前的分区器相同
rdd3.mapPartitions(iter=>{iter}, true);
//11、distinct
rdd3.distinct().foreach(println);
//12、mapPartitionsWithIndex
val rdd5 = sc.parallelize(Array(
"love1","love2","love3","love4",
"love5","love6","love7","love8",
"love9","love10","love11","love12"
),3);
rdd5.mapPartitionsWithIndex((index,iter)=>{
val list=new ListBuffer[String]();
while(iter.hasNext){
list.+=("rdd1 partition index = "+index+",value = "+iter.next());
}
list.iterator;
}, true);
//13、repartition
rdd5.repartition(4).foreach(println);
//14、coalesce
rdd5.coalesce(4, false).foreach(println);
//15、groupByKey
val rdd6 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"),(3,"d")),3);
rdd6.groupByKey().foreach(println);
//16、zipWithIndex
rdd6.zipWithIndex().foreach(println);
//17、zip
val rdd8= sc.parallelize(Array((11,"aa"),(22,"bb"),(33,"cc"),(33,"dd")),3);
rdd6.zip(rdd8).foreach(println);
/*
* ((1,a),(11,aa))
((2,b),(22,bb))
((3,c),(33,cc))
((3,d),(33,dd))
*/ //Action 算子
//18、countByKey
rdd6.countByKey().foreach(println);
//19、countByValue
rdd6.countByValue().foreach(println);
//20、reduce
val rdd7=sc.parallelize(Array(1,2,3,4,5));
val rev = rdd7.reduce((v1,v2)=>{
v1+v2;
});
println(rev);//15 sc.stop();
} }

五、Action行动算子
Action类算子叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
take(n):返回一个包含数据集前n个元素的集合。
first:first=take(1),返回数据集中的第一个元素。
foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
collect:将计算结果回收到Driver端。

【Spark-core学习之二】 RDD和算子-LMLPHP

六、控制算子
控制算子有三种:cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。
cache和persist都是懒执行的。必须有一个action类算子触发执行。
checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

【Spark-core学习之二】 RDD和算子-LMLPHP

1、cache:默认将RDD的数据持久化到内存中,cache是懒执行。

 SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("CacheTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95"); lines = lines.cache();
long startTime = System.currentTimeMillis();
long count = lines.count();
long endTime = System.currentTimeMillis();
System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ (endTime-startTime)); long countStartTime = System.currentTimeMillis();
long countrResult = lines.count();
long countEndTime = System.currentTimeMillis();
System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-countStartTime)); jsc.stop();

2、persist:可以指定持久化的级别。最常用的是MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER。”_2”表示有副本数。
持久化级别如下:【Spark-core学习之二】 RDD和算子-LMLPHP

【Spark-core学习之二】 RDD和算子-LMLPHP
cache和persist的注意事项:
(1)cache和persist都是懒执行,必须有一个action类算子触发执行。
(2)cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
(3)cache和persist算子后不能立即紧跟action算子。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
checkpoint 的执行原理:
(1)当RDD的job执行完毕后,会从finalRDD从后往前回溯。
(2)当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
(3)Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
使用:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("checkpoint");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setCheckpointDir("./checkpoint");
JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
parallelize.checkpoint();
parallelize.count();
sc.stop();

参考:
Spark:https://www.cnblogs.com/qingyunzong/category/1202252.html

Spark之RDD

05-11 22:43