大数据全系列 教程
1869个小节阅读:467k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
1.The file system connector itself is included in Flink and does not require an additional dependency. 文件系统连接器本身包含在 Flink 中,不需要额外的依赖。
2.A corresponding format needs to be specified for reading and writing rows from and to a file system.需要指定相应的格式以从文件系统读取行和向文件系统写入行。
3.The file system connector allows for reading and writing from a local or distributed filesystem. 文件系统连接器允许从本地或分布式文件系统读取和写入
A filesystem table can be defined as:
案例需求:从kafka主题station-topic中读取数据,通过FileSystem Connector写json格式的文件到本地。
添加依赖:
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14.6</version>
</dependency>
编写Connector FileSystem Sink的SQL DDL语句:
xxxxxxxxxx
CREATE TABLE tb_station_sink (
`sid` STRING,
`call_type` STRING,
`call_time` BIGINT,
`duration` BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'datas/stations',
'format' = 'json',
'sink.rolling-policy.file-size' = '1MB',
'sink.rolling-policy.rollover-interval' = '2 min',
'sink.rolling-policy.check-interval' = '1 min',
'sink.parallelism' = '1'
)
编写程序:
xxxxxxxxxx
package com.itbaizhan.flink.scala.tableapi_sql
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object FileSystemSinkDemo {
def main(args: Array[String]): Unit = {
//构建环境配置对象,并指定流处理
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
//创建Table执行环境对象
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
//使用SQL DDL创建Source表
tableEnv.executeSql("CREATE TABLE tb_station (\n" +
" `sid` STRING,\n" +
" `call_type` STRING,\n" +
" `call_time` BIGINT,\n" +
" `duration` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'station-topic',\n" +
" 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9092',\n" +
" 'properties.group.id' = 'gid4',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")"
)
//使用SQL DDL创建FileSystem sink表
tableEnv.executeSql("CREATE TEMPORARY TABLE tb_station_sink (\n" +
" `sid` STRING,\n" +
" `call_type` STRING,\n" +
" `call_time` BIGINT,\n" +
" `duration` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'datas/stations',\n" +
" 'format' = 'json',\n" +
" 'sink.rolling-policy.file-size' = '1MB',\n" +
" 'sink.rolling-policy.rollover-interval' = '2 min',\n" +
" 'sink.rolling-policy.check-interval' = '1 min',\n" +
" 'sink.parallelism' = '1'\n" +
")"
)
//执行从Source表查询数据并通过插入到Sink表实现写数据到指定path路径下
tableEnv.executeSql(
"insert into tb_station_sink" +
" select sid,call_type,call_time,duration" +
" from tb_station")
}
}
测试:
运行MakeStationsData类
运行FileSystemSinkDemo类
查看结果: