大数据全系列 教程
1869个小节阅读:467.9k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例演示:使用TableAPI方式,读取
datas/IMDB_10000.csv
电影评分文本数据,创建为表,查询前100条,结果打印控制台。
xxxxxxxxxx
package com.itbaizhan.flink.scala.tableapi_sql
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment}
object GettingStartedTableApiDemo {
def main(args: Array[String]): Unit = {
//TODO 1.构建表执行环境 [重点掌握该方式]
//构建环境配置对象,并指定批处理或流处理
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
//TODO 2.使用SQL DDL创建Source表
//title year certificate runtime genre desc rating votes
tableEnv.executeSql(
"CREATE TABLE imdb (\n" +
" title STRING, \n" +
" `year` STRING,\n" +
" certificate STRING, \n" +
" runtime STRING,\n" +
" genre STRING,\n" +
" desc STRING,\n" +
" rating DOUBLE,\n" +
" votes INT\n) " +
"WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'path' = 'datas/IMDB_10000.csv', \n" +
" 'format' = 'csv',\n" +
" 'csv.field-delimiter' = ',',\n" +
" 'csv.ignore-parse-errors' = 'true'\n)")
//TODO 3.TableAPI执行查询并返回Table对象
//不引入下行,$("title")无法识别
import org.apache.flink.table.api.Expressions._
//执行查询的表
val resultTable: Table = tableEnv.from("imdb")
//指定查询的列
.select($("title"),$("rating"),$("votes"))
//过滤掉投票数为null的数据
.where($("votes").isNotNull)
//指定查询的行数
.limit(100)
//TODO 4.1 方式一执行查询并输出
//resultTable.execute().print()
//TODO 4.2方式二:使用SQL DDL创建Sink表
tableEnv.executeSql("CREATE TABLE print_table (\n" +
" title STRING,\n" +
" rating DOUBLE,\n" +
" votes INT\n) " +
"WITH (\n" +
" 'connector' = 'print'\n)")
//将查询结果插入sink表
resultTable.executeInsert("print_table")
}
}