大数据全系列 教程
1869个小节阅读:467.2k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
StateBackend(状态后端)它是一种保存机制。
Flink 1.13之前状态后端存储的三种方式:Memory(内存)、FS(文件系统)和RocksDB(嵌入式数据库)。
MemoryStateBackEnd | FsStateBackEnd | RocksDBStateBackEnd | |
---|---|---|---|
state | TaskManager内存 | TaskManager内存 | RocksDB内存数据库 |
checkpoint | JobManager内存 | FS文件系统 | FS文件系统 |
推荐使用的场景 | 本地测试、几乎无状态的作业,比如 ETL、JobManager 不容易挂或挂掉影响不大的情况。不推荐在生产场景使用 | 常规使用状态的作业,例如分钟级窗口聚合或 join、需要开启HA的作业 | 超大状态的作业,例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业。 |
构造方法 | MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) | FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) | RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) |
是否要依赖 | 不需要 | 不需要 | 需要 |
容量限制 | 1.maxStateSize单个State的最大默认值5MB 2.maxStateSize<=akka.framesize(默认10MB) 3.总大小不能超过JobManager内存。 | 1.单TaskManager上State总量不能超过该节点的内存。 2.总大小不超过配置的文件系统的容量 | 1.单TaskManager上State总量不能超过该节点的内存+磁盘总和。 2.单个Key的State最大2GB 3.总大小不超过配置的文件系统的容量 |
RocksDBStateBackEnd pom依赖配置:
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.14.6</version>
</dependency>
小结:在Flink 1.13之前版本,状态State存储和Checkpoint检查点两个功能是混在一起的,即把状态存储和检查点的创建概念笼统的混在一起,导致初学者对此部分感觉很混乱,很难理解。
Flink 1.13开始将状态State
和检查点Checkpoint
两者区分开来。
State Backend 的概念变窄,只描述状态访问和存储。
HashMapStateBackend内存存储
xxxxxxxxxx
env.setStateBackend(new HashMapStateBackend())
EmbeddedRocksDBStateBackend:内置内存数据库RocksDB,先内存后磁盘,字节数组。
xxxxxxxxxx
env.setStateBackend(new EmbeddedRocksDBStateBackend())
CheckpointStorage,描述的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到HDFS文件系统。
//JobManagerCheckpointStorage JM内存
env.getCheckpointConfig.setCheckpointStorage(new JobManagerCheckpointStorage)
//FileSystemCheckpointStorage文件系统(可以本地和HDFS)
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoint-dir"))
env.getCheckpointConfig.setCheckpointStorage("hdfs://mycluster/flink/checkpoint-dir")