4. 缓存
缓存的意义
缓存相关的 API
缓存级别以及最佳实践
4.1. 缓存的意义
- 使用缓存的原因 - 多次使用 RDD
需求: 在日志文件中找到访问次数最少的 IP 和访问次数最多的 IP
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string") val sc = new SparkContext(conf) val interimRDD = sc.textFile("dataset/access_log_sample.txt") .map(item => (item.split(" ")(0), 1)) .filter(item => StringUtils.isNotBlank(item._1)) .reduceByKey((curr, agg) => curr + agg) val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first() val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first() println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore") sc.stop()
这是一个 Shuffle 操作, Shuffle 操作会在集群内进行数据拷贝 在上述代码中, 多次使用到了
interimRDD
, 导致文件读取两次, 计算两次, 有没有什么办法增进上述代码的性能?- 使用缓存的原因 - 容错
当在计算 RDD3 的时候如果出错了, 会怎么进行容错?
会再次计算 RDD1 和 RDD2 的整个链条, 假设 RDD1 和 RDD2 是通过比较昂贵的操作得来的, 有没有什么办法减少这种开销?
上述两个问题的解决方案其实都是 缓存
, 除此之外, 使用缓存的理由还有很多, 但是总结一句, 就是缓存能够帮助开发者在进行一些昂贵操作后, 将其结果保存下来, 以便下次使用无需再次执行, 缓存能够显著的提升性能.
所以, 缓存适合在一个 RDD 需要重复多次利用, 并且还不是特别大的情况下使用, 例如迭代计算等场景.
4.2. 缓存相关的 API
- 可以使用
cache
方法进行缓存 val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string") val sc = new SparkContext(conf) val interimRDD = sc.textFile("dataset/access_log_sample.txt") .map(item => (item.split(" ")(0), 1)) .filter(item => StringUtils.isNotBlank(item._1)) .reduceByKey((curr, agg) => curr + agg) .cache() val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first() val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first() println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore") sc.stop()
缓存 方法签名如下
cache(): this.type = persist()
cache 方法其实是
persist
方法的一个别名- 也可以使用 persist 方法进行缓存
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string") val sc = new SparkContext(conf) val interimRDD = sc.textFile("dataset/access_log_sample.txt") .map(item => (item.split(" ")(0), 1)) .filter(item => StringUtils.isNotBlank(item._1)) .reduceByKey((curr, agg) => curr + agg) .persist(StorageLevel.MEMORY_ONLY) val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first() val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first() println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore") sc.stop()
缓存 方法签名如下
persist(): this.type persist(newLevel: StorageLevel): this.type
persist
方法其实有两种形式,persist()
是persist(newLevel: StorageLevel)
的一个别名,persist(newLevel: StorageLevel)
能够指定缓存的级别- 缓存其实是一种空间换时间的做法, 会占用额外的存储资源, 如何清理?
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string") val sc = new SparkContext(conf) val interimRDD = sc.textFile("dataset/access_log_sample.txt") .map(item => (item.split(" ")(0), 1)) .filter(item => StringUtils.isNotBlank(item._1)) .reduceByKey((curr, agg) => curr + agg) .persist() interimRDD.unpersist() val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first() val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first() println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore") sc.stop()
清理缓存 根据缓存级别的不同, 缓存存储的位置也不同, 但是使用
unpersist
可以指定删除 RDD 对应的缓存信息, 并指定缓存级别为NONE
4.3. 缓存级别
其实如何缓存是一个技术活, 有很多细节需要思考, 如下
是否使用磁盘缓存?
是否使用内存缓存?
是否使用堆外内存?
缓存前是否先序列化?
是否需要有副本?
如果要回答这些信息的话, 可以先查看一下 RDD 的缓存级别对象
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)
val interimRDD = sc.textFile("dataset/access_log_sample.txt")
.map(item => (item.split(" ")(0), 1))
.filter(item => StringUtils.isNotBlank(item._1))
.reduceByKey((curr, agg) => curr + agg)
.persist()
println(interimRDD.getStorageLevel)
sc.stop()
打印出来的对象是 StorageLevel
, 其中有如下几个构造参数
根据这几个参数的不同, StorageLevel
有如下几个枚举对象
| false | false | false | false | 1 |
| true | false | false | false | 1 |
| true | false | false | false | 2 |
| false | true | false | true | 1 |
| false | true | false | true | 2 |
| false | true | false | false | 1 |
| false | true | false | false | 2 |
| true | true | false | true | 1 |
| true | true | false | true | 2 |
| true | true | false | false | 1 |
| true | true | false | false | 2 |
| true | true | true | false | 1 |
Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:
如果您的 RDD 适合于默认存储级别(MEMORY_ONLY),leave them that way。这是 CPU 效率最高的选项,允许 RDD 上的操作尽可能快地运行.
如果不是,试着使用 MEMORY_ONLY_SER 和 selecting a fast serialization library 以使对象更加节省空间,但仍然能够快速访问。(Java和Scala)
不要溢出到磁盘,除非计算您的数据集的函数是昂贵的,或者它们过滤大量的数据。否则,重新计算分区可能与从磁盘读取分区一样快.
如果需要快速故障恢复,请使用复制的存储级别(例如,如果使用 Spark 来服务 来自网络应用程序的请求)。All 存储级别通过重新计算丢失的数据来提供完整的容错能力,但复制的数据可让您继续在 RDD 上运行任务,而无需等待重新计算一个丢失的分区.