SparkCore加强
- 重点:RDD的持久化和Checkpoint
- 提高拓展知识:Spark内核调度全流程,Spark的Shuffle
- 练习:热力图统计及电商基础指标统计
- combineByKey作为面试部分重点,可以作为扩展知识点
Spark算子补充
-
关联函数补充
-
join为主基础算子
-
# -*- coding: utf-8 -*- # Program function:演示join操作 from pyspark import SparkConf, SparkContext if __name__ == '__main__': print('PySpark join Function Program') # TODO:1、创建应用程序入口SparkContext实例对象 conf = SparkConf().setAppName("miniProject").setMaster("local[*]") sc = SparkContext.getOrCreate(conf) # TODO: 2、从本地文件系统创建RDD数据集 x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")]) y = sc.parallelize([(1001, "sales"), (1002, "tech")]) # TODO:3、使用join完成联合操作 print(x.join(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))] print(x.leftOuterJoin(y).collect()) print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))] sc.stop()
[掌握]RDD 持久化
为什么使用缓存
- 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
- 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
- 使用经验:一次缓存可以多次使用
如何进行缓存?
-
spark中提供cache方法
-
spark中提供persist方法
-
# -*- coding: utf-8 -*- # Program function:演示join操作 from pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevel import time if __name__ == '__main__': print('PySpark join Function Program') # TODO:1、创建应用程序入口SparkContext实例对象 conf = SparkConf().setAppName("miniProject").setMaster("local[*]") sc = SparkContext.getOrCreate(conf) # TODO: 2、从本地文件系统创建RDD数据集 x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")]) y = sc.parallelize([(1001, "sales"), (1002, "tech")]) # TODO:3、使用join完成联合操作 join_result_rdd = x.join(y) print(join_result_rdd.collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))] print(x.leftOuterJoin(y).collect()) print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))] # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY) join_result_rdd.cache() # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2) # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识 join_result_rdd.collect() # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count print(join_result_rdd.count()) time.sleep(600) sc.stop()
[掌握]RDD Checkpoint
-
为什么有检查点机制?
- 因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
- Spark的容错问题?
- 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
-
如何使用检查点机制?
- 指定数据保存在哪里?
- sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
- 对谁缓存?答案算子
- rdd1.checkpoint() 斩断依赖关系进行检查点
- 检查点机制触发方式
- action算子可以触发
- 后续的计算过程
- Spark机制直接从checkpoint中读取数据
- 实验过程还原:
-
检查点机制那些作用?
- 将数据和元数据保存在HDFS中
- 后续执行rdd的计算直接基于checkpoint的rdd
- 起到了容错的作用
-
面试题:如何实现Spark的容错?
- 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
- 2-否则查看checkpoint是否保存数据
- 3-否则根据依赖关系重建RDD
-
检查点机制案例
案例测试:
AI副业实战手册:http://www.yibencezi.com/notes/253200?affiliate_id=1317(目前40+工具及实战案例,持续更新,实战类小册排名第一,做三个月挣不到钱找我退款,交个朋友的产品)
后记
📢博客主页:https://manor.blog.csdn.net
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 Maynor 原创,首发于 CSDN博客🙉
📢感觉这辈子,最深情绵长的注视,都给了手机⭐
📢专栏持续更新,欢迎订阅:https://blog.csdn.net/xianyu120/category_12453356.html