大数据全系列 教程
1869个小节阅读:465.9k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需要的Maven依赖:
xxxxxxxxxx
<!-- Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.6</version>
</dependency>
伪代码便于理解,不能执行:
xpackage com.itbaizhan.flink.scala
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object TableApiSQLFirstDemo {
def main(args: Array[String]): Unit = {
//TODO 1.构建表的执行环境对象 该方式了解即可
/*val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)*/
//TODO 1.构建表的执行环境对象 [重点掌握]
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
//流处理
.inStreamingMode()
//批处理
//.inBatchMode()
.build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
//TODO 2.使用SQL DDL创建Source表
tableEnv.executeSql("CREATE TABLE inTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = 'node2:9092',\n" +
" 'properties.group.id' = 'grp1',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")"
)
//TODO 3.使用SQL DDL创建Sink表
tableEnv.executeSql("CREATE TABLE outTable (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status BOOLEAN,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://node1:3306/mydb',\n" +
" 'table-name' = 'users'\n" +
")"
)
//TODO 4.查询Source表中符合条件的的数据,并封装到Table对象中
//方式一:使用TableAPI的方式进行查询
val table1: Table = tableEnv.from("inTable")
//指定查询的列
.select($"user_id", $("item_id"))
//方式二:使用SQL query进行查询 【推荐】
val table2: Table = tableEnv.sqlQuery("select * user_id,item_id from inTable")
//TODO 5.将4查询分析的结果插入到Sink中
val tableResult1: TableResult = table1.executeInsert("outTable")
val tableResult2: TableResult = table2.executeInsert("outTable")
//TODO 4-5 整合到一起
tableEnv.executeSql("INSERT INTO outTable " +
"SELECT * FROM inTable")
}
}