大数据全系列 教程
1869个小节阅读:465k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例演示:使用SQL方式,读取datas/IMDB_10000.csv电影评分文本数据,创建为表,查询前100条,结果打印控制台。
需要依赖:
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.6</version>
</dependency>
create a table脚本:
xxxxxxxxxx
CREATE TABLE imdb (
title STRING,
`year` STRING,
certificate STRING,
runtime STRING,
genre STRING,
desc STRING,
rating DOUBLE,
votes INT)
WITH (
'connector' = 'filesystem',
'path' = 'datas/IMDB_10000.csv',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true')
xxxxxxxxxx
CREATE TABLE print_table (
title STRING,
rating DOUBLE,
votes INT
) WITH (
'connector' = 'print'
)
代码实现:
xxxxxxxxxx
package com.itbaizhan.flink.scala.tableapi_sql
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableResult}
object GettingStartedSQLDemo {
def main(args: Array[String]): Unit = {
//TODO 1.创建表的执行环境对象
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
//TODO 2.使用SQL DDL创建Source表
//title,year,certificate,runtime,genre,desc,rating,votes
tableEnv.executeSql("CREATE TABLE imdb (\n" +
" `title` STRING,\n" +
" `year` STRING,\n" +
" `certificate` STRING,\n" +
" `runtime` STRING,\n" +
" `genre` STRING,\n" +
" `desc` STRING,\n" +
" `rating` DOUBLE,\n" +
" `votes` INT \n"+
" ) WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'datas/IMDB_10000.csv',\n" +
" 'format' = 'csv',\n" +
" 'csv.field-delimiter' = ',',\n" +
" 'csv.ignore-parse-errors' = 'true'\n" +
")"
)
//TODO 3.执行查询并输出
/*val tableResult: TableResult = tableEnv.executeSql(
"select title,rating,votes" +
" from imdb" +
" where votes is not null" +
" limit 100")
tableResult.print()*/
//TODO 3.方式二:使用SQL DDL创建Sink表
tableEnv.executeSql("CREATE TABLE print_table (\n" +
" title STRING,\n" +
" rating DOUBLE,\n" +
" votes INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")"
)
//将查询结果插入到sink表
tableEnv.executeSql("insert into print_table" +
" select title,rating,votes from imdb" +
" where votes is not null" +
" limit 100")
}
}