大数据全系列 教程
1869个小节阅读:464.7k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例需求:词频统计WordCount,读取node3:8888数据源,录入产生数据,设置Checkpoint,运行程序,查看Checkpoint检查点数据存储。
编码实现:
xxxxxxxxxx
package com.itbaizhan.flink.scala.state
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.{FileSystemCheckpointStorage, JobManagerCheckpointStorage}
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object WordCountCheckpointDemo {
def main(args: Array[String]): Unit = {
import org.apache.flink.streaming.api.scala._
//TODO 1.为了方便本地运行时查看Flink WebUI
val conf:Configuration = new Configuration()
conf.setString("rest.port", "8081")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment
.createLocalEnvironment(1,conf)
//TODO 2.为当前job设置state和检查点相关
setJobCheckpoint(env)
env.socketTextStream("node3", 8888)
.filter(_.trim.length>0)//过滤掉空行
.flatMap(_.split("\\s+"))//按空格拆分
.map((_, 1)) //每个单词计数为1
.keyBy(_._1)
.sum(1)//分组统计
.printToErr()
//触发执行-execute
env.execute("WordCountCheckpointDemo")
}
//TODO 3.方法中设置相关属性
def setJobCheckpoint(env: StreamExecutionEnvironment) = {
//1.设置Checkpoint间隔时间,单位毫秒
env.enableCheckpointing(2000)
//2.设置state状态后端,内存存储
env.setStateBackend(new HashMapStateBackend())
//获取env的CheckpointConfig对象
val ckptConfig: CheckpointConfig = env.getCheckpointConfig
//3.检查点存储
//JM内存存储
//ckptConfig.setCheckpointStorage(new JobManagerCheckpointStorage)
//HDFS文件系统存储(暂不演示)
//ckptConfig.setCheckpointStorage("hdfs://mycluster/flink/ckpts")
//本地文件系统存储
ckptConfig.setCheckpointStorage("file:///D:/ckpts")
//4.设置checkpoint执行模式 默认EXACTLY_ONCE精准一次,需要Source和Sink的支持
ckptConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//5.设置两个checkpoint之间最少的等待时间 单位毫秒
ckptConfig.setMinPauseBetweenCheckpoints(500)
//5.设置同一时刻允许多少个checkpoint同时执行
ckptConfig.setMaxConcurrentCheckpoints(1)
//6.设置checkpoint超时时间 如果一次ckpt在30s内还不能完成,说明本次失败则丢弃
ckptConfig.setCheckpointTimeout(30000)
//7.设置允许ckpt失败次数
ckptConfig.setTolerableCheckpointFailureNumber(3)
//8.设置当Cancel当前job时是否清理检查点,默认 Checkpoint会在作业被Cancel时被清理(被删除)
//DELETE_ON_CANCELLATION,当作业被取消时,删除外部的checkpoint(默认值)
//RETAIN_ON_CANCELLATION,当作业被取消时,保留外部的checkpoint
ckptConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//9.设置重启策略 noRestart
env.setRestartStrategy(RestartStrategies.noRestart())
}
}