大数据全系列 教程
1869个小节阅读:464.8k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
在Flink流式计算程序中,如果设置Checkpoint检查点以后,当应用程序运行失败,可以从检查点恢复:
手动重启
应用,从Checkpoint恢复状态,程序代码更新,系统升级(人为停止程序)等
自动重启
应用,从Checkpoint恢复状态
手动恢复操作步骤
WordCountCheckpointDemo
拷贝一份WordCountCkptDemo
,并做如下修改:改动一:
xxxxxxxxxx
//TODO 1.为了方便本地运行时查看Flink WebUI
val conf:Configuration = new Configuration()
conf.setString("rest.port", "8081")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment
.createLocalEnvironment(1,conf)
setJobCheckpoint(env)
改为:
xxxxxxxxxx
//TODO 1.集群运行
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//TODO 2.为当前job设置state和检查点相关
setJobCheckpoint(env,args(0))
改动二:
xxxxxxxxxx
def setJobCheckpoint(env: StreamExecutionEnvironment) = {
......
//JM内存存储
//ckptConfig.setCheckpointStorage(new JobManagerCheckpointStorage)
//HDFS文件系统存储
//ckptConfig.setCheckpointStorage("hdfs://mycluster/flink/ckpts/")
//本地文件系统存储
ckptConfig.setCheckpointStorage("file:///D:/ckpts")
...
}
改为
xxxxxxxxxx
def setJobCheckpoint(env: StreamExecutionEnvironment,path:String) = {
......
//JM内存存储
//ckptConfig.setCheckpointStorage(new JobManagerCheckpointStorage)
//HDFS文件系统存储
ckptConfig.setCheckpointStorage(path)
//本地文件系统存储
//ckptConfig.setCheckpointStorage("file:///D:/ckpts")
...
}
打包项目
启动HDFS集群
xxxxxxxxxx
[root@node1 ~]# starthdfs.sh
或者
[root@node1 ~]# startha.sh
启动Flink集群(Standalone 分布式集群)
xxxxxxxxxx
[root@node1 ~]# /opt/flink-salone/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node1.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3
node3上执行
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
访问FlinkwebUI:http://node1:8081
使用Flink WebUI提交
填写如下参数
xxxxxxxxxx
com.itbaizhan.flink.scala.state.WordCountCkptDemo
hdfs://mycluster/flink/ckpts
node3上输入数据
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
a b c
a b
a
取消job
查看HDFS目录/flink/ckpts/xxx,Checkpoint存文件
重新启动任务并指定检查点文件路径
填写如下参数
xxxxxxxxxx
com.itbaizhan.flink.scala.state.WordCountCkptDemo
hdfs://mycluster/flink/ckpts
hdfs://mycluster/flink/ckpts/90797f01475ba52efeda8b8d2bc17a63/chk-29
node3上再次输入数据a b c
xxxxxxxxxx
[root@node3 ~]# nc -lk 8888
a b c
a b
a #以上之前输入的
a b c
查询输出结果
Cancel当前job。