大数据全系列 教程
1869个小节阅读:468.1k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
cache缓存如何保存的?
CheckPoint检查点其实就是通过将RDD中间结果写入磁盘(仅支持磁盘存储
)。被设计为安全的
,且不保留血缘关系。
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
CheckPoint如何保持数据?
如上图CheckPoint存储RDD的数据,是收集各个分区的数据后进行统一存储,而缓存是分散存储的。
涉及到的API代码:
xxxxxxxxxx
//1.开启CheckPoint功能,并指定数据存在到HDFS的什么路径下
sc.setCheckpointDir("hdfs://mycluster/Checkpoints")
//2.调用checkpoint API 保存数据即可
rdd3.checkpoint()
代码演示:
xxxxxxxxxx
package com.itbaizhan.core
//1.导入spark下的SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CoreCheckpoint {
def main(args: Array[String]): Unit = {
//2.构建SparkConf对象,并设置本地运行和程序的名称
val conf = new SparkConf().setMaster("local[*]").setAppName("checkpoint")
//3.通过SparkConf对象构建SparkContext对象
val sc = new SparkContext(conf)
//B1.开启Checkpoint功能,并指定数据存储到HDFS的什么路径下
sc.setCheckpointDir("hdfs://mycluster/Checkpoints")
val fileRdd: RDD[String] = sc.textFile("hdfs://mycluster/wordcount/input/wc.txt",1)
val wordsRdd: RDD[String] = fileRdd.flatMap(_.split(" "))
val wordAndOneRdd: RDD[(String, Int)] = wordsRdd.map((_,1))
//B2.对wordAndOneRdd进行检查点存在
wordAndOneRdd.checkpoint()
val resultRdd: RDD[(String, Int)] = wordAndOneRdd.reduceByKey(_ + _)
println(resultRdd.collect().mkString(","))
//B3.设计另外一个job
val groupByRdd: RDD[(String, Iterable[Int])] = wordAndOneRdd.groupByKey()
val groupByValueRdd: RDD[(String, Int)] = groupByRdd.mapValues(_.sum)
print(groupByValueRdd.collect().mkString(","))
//4.关闭sc
sc.stop()
}
}
运行后去hdfs文件系统查看。
实时效果反馈
1. 关于CheckPoint持久化的相关描述,正确的是:
A CheckPoint检查点其实就是通过将RDD中间结果写入磁盘(仅支持磁盘存储
)。
B CheckPoint被设计为安全的
,且不保留血缘关系。
C CheckPoint存储RDD的数据,是收集各个分区的数据后进行统一存储,而缓存是分散存储的。
D 以上三个选项都正确。
答案:
1=>D