大数据全系列 教程
1869个小节阅读:467.6k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Flink 1.12 中,提供流批统一的 FileSink connector,以替换现有的StreamingFileSink connector,允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现。
需要添加Maven依赖
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.14.6</version>
</dependency>
演示:使用自定义数据源产生的基站通讯记录数据,写入本地文件系统文件中。
xxxxxxxxxx
package com.itbaizhan.flink.scala.sink
import com.itbaizhan.flink.scala.source.{MyDefinedSource, MyDefinedSource2, StationLog}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.concurrent.TimeUnit
object FileSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
//添加隐式转
import org.apache.flink.streaming.api.scala._
val sourceFunctionDs: DataStream[StationLog] = env.addSource(new MyDefinedSource())
//将流中元素StationLog对象转化为字符串对象
val ds: DataStream[String] = sourceFunctionDs.map(ele => ele.toString)
val fileSink: FileSink[String] = FileSink
//设置存储文件的格式,Row存储
.forRowFormat(new Path("datas/file-sink"), new SimpleStringEncoder[String]("UTF-8"))
//设置桶的分配策略:默认基于时间的分配器,每小时产生一个桶,格式如:yyyy-MM-dd--HH
.withBucketAssigner(new DateTimeBucketAssigner[String]())
//设置数据文件的滚动策略
.withRollingPolicy(
DefaultRollingPolicy.builder()
//设置文件多大后生产新的文件,默认128M
.withMaxPartSize(1024 * 1024 * 1024) //1G
//每个多长时间生产一个新的文件,默认1分钟
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) //15分钟
//桶是否活跃的检查时间间隔
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
) //设置文件名称
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("itbaizhan") //前缀
.withPartSuffix(".log") //后缀
.build()
)
.build()
//为数据流添加sink
ds.sinkTo(fileSink)
env.execute("FileSinkDemo")
}
}
注意:使用FileSink 时需要启用Checkpoint ,每次做Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。