大数据全系列 教程
1869个小节阅读:467.4k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
从Flink 1.11版本开始,增加JDBC Connector连接器,可以将DataStream数据直接保存RDBMS表中。
添加Maven依赖:
xxxxxxxxxx
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.6</version>
</dependency>
案例:向flink_db.tb_product表中添加两条商品数据。
xxxxxxxxxx
package com.itbaizhan.flink.scala.sink
import com.itbaizhan.flink.scala.source.{MyDefinedSource, Product, StationLog}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.sql.PreparedStatement
import java.util.concurrent.TimeUnit
object JdbcSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//添加隐式转
import org.apache.flink.streaming.api.scala._
//集合数据源
val ds: DataStream[Product] = env.fromCollection(Array[Product](
Product(0, "jdbcsink1", 8.88),
Product(0, "jdbcsink2", 9.98)
))
//构建JdbcSink对象
val jdbcSink: SinkFunction[Product] = JdbcSink.sink(
//第一个参数:sql语句
"insert into tb_product(name,price) values(?,?)",
//第二个参数:为占位符赋值的设置
new JdbcStatementBuilder[Product] {
override def accept(pstat: PreparedStatement, prod: Product): Unit = {
//为第一个占位符name赋值,注意类型
pstat.setString(1, prod.name)
//为第二个占位符price赋值,注意类型
pstat.setDouble(2, prod.price)
}
}, //第三个参数:数据连接的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
//设置数据库url地址
.withUrl("jdbc:mysql://node1:3306/flink_db")
//设置驱动类
.withDriverName("com.mysql.cj.jdbc.Driver")
//设置连接数据库的用户名和密码
.withUsername("root")
.withPassword("123456")
.build()
)
//将sink和ds绑定
ds.addSink(jdbcSink)
env.execute("JdbcSinkDemo")
}
}