大数据全系列 教程
1869个小节阅读:467.4k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
The JDBC connector allows for ==reading data== from and writing data into any relational databases with a JDBC driver.
案例需求:使用Connector JDBCSource从MySQL的flink_db.tb_station表中读取数据并输出到控制台。
编写Connector JDBC Source的SQL DDL语句:
xxxxxxxxxx
CREATE TEMPORARY TABLE tb_station_source (
`sid` STRING,
`call_type` STRING,
`call_time` BIGINT,
`duration` BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://node1:3306/flink_db',
'table-name' = 'tb_station',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '123456'
)
编写程序:
xxxxxxxxxx
package com.itbaizhan.flink.scala.tableapi_sql
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object JDBCSourceDemo {
def main(args: Array[String]): Unit = {
//构建环境配置对象,并指定流处理
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
//创建Table执行环境对象
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
// 使用SQL DDL创建临时Source表
tableEnv.executeSql("CREATE TEMPORARY TABLE tb_station_source (\n" +
" `sid` STRING,\n" +
" `call_type` STRING,\n" +
" `call_time` BIGINT,\n" +
" `duration` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://node1:3306/flink_db',\n" +
" 'table-name' = 'tb_station',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456'\n" +
")"
)
//执行查询并输出
tableEnv.executeSql(
"select sid,call_type,call_time,duration" +
" from tb_station_source").print()
}
}
测试:运行JDBCSourceDemo类查看控制台。