大数据全系列 教程
1869个小节阅读:468k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Flink 1.14版本,应用程序使用新 API,兼容以前版本State状态和Checkpoint检查点设置:
三种方式的具体使用:
MemoryStateBackEnd内存状态后端【了解】
全局配置flink-conf.yaml:
xxxxxxxxxx
state.backend hashmap
state.checkpoint-storage jobmanager
程序中为当前job设置:
xxxxxxxxxx
env.setStateBackend(new HashMapStateBackend())
env.getCheckpointConfig.setCheckpointStorage(new JobManagerCheckpointStorage)
FsStateBackEnd文件系统状态后端
全局配置flink-conf.yaml:
xxxxxxxxxx
state.backend hashmap
state.checkpoint-storage filesystem
state.checkpoints.dir file ///xx 或 hdfs //mycluster/flink/checkpoints/
程序中为当前job设置:
xxxxxxxxxx
env.setStateBackend(new HashMapStateBackend())
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoint-dir"))
RocksDBStateBackEnd状态后端
全局配置flink-conf.yaml:
xxxxxxxxxx
state.backend rocksdb
state.checkpoint-storage filesystem
state.checkpoints.dir file /// 或 hdfs //mycluster/flink/checkpoints/
程序中为当前job设置:
xxxxxxxxxx
env.setStateBackend(new EmbeddedRocksDBStateBackend())
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoint-dir"))
RocksDBStateBackEnd pom依赖配置:
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.14.6</version>
</dependency>
附录:Checkpoints and State backends配置参数
Key | Default | Type | Description |
---|---|---|---|
state.backend | (none) | String | The state backend to be used to store state. The implementation can be specified either via their shortcut name, or via the class name of a StateBackendFactory . If a factory is specified it is instantiated via its zero argument constructor and its StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader) method is called. Recognized shortcut names are 'hashmap' and 'rocksdb'. |
state.checkpoint-storage | (none) | String | The checkpoint storage implementation to be used to checkpoint state. The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory . If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called. Recognized shortcut names are 'jobmanager' and 'filesystem'. |
state.checkpoints.dir | (none) | String | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). |
state.savepoints.dir | (none) | String | The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). |
state.backend.incremental | false | Boolean | Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option. |
state.backend.local-recovery | false | Boolean | This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend). |
state.checkpoints.num-retained | 1 | Integer | The maximum number of completed checkpoints to retain. |
taskmanager.state.local.root-dirs | (none) | String | The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. |