大数据全系列 教程
1869个小节阅读:467.5k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
当然你可以自己定义Sink,有两种实现方式:1、实现SinkFunction接口。2、实现RichSinkFunction类。后者增加了生命周期的管理功能。比如需要在Sink初始化的时候创建连接对象,则最好使用第二种。
案例需求:把Product对象写入Mysql数据库中。
xxxxxxxxxx
package com.itbaizhan.flink.scala.sink
import com.itbaizhan.flink.scala.source.Product
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.sql.{Connection, DriverManager, PreparedStatement}
object CustomSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//添加隐式转
import org.apache.flink.streaming.api.scala._
//准备数据
val ds: DataStream[Product] = env.fromCollection(Array[Product](
Product(0, "CustomSink1", 6.66),
Product(0, "CustomSink2", 7.66)
))
//构建自定义Sink对象,并绑定到ds上
ds.addSink(new MyCustomSink)
env.execute("CustomSinkDemo")
}
}
//自定义类
class MyCustomSink extends RichSinkFunction[Product]{
//生命数据库连接对象
private var conn:Connection = _
private var pstat:PreparedStatement = _
//生命周期管理方法,在Sink初始化的时候调用
override def open(parameters: Configuration): Unit = {
//构建数据库连接对象
conn = DriverManager.getConnection("jdbc:mysql://node1:3306/flink_db","root","123456")
//预编译sql语句
pstat = conn.prepareStatement("insert into tb_product(name,price) values(?,?)")
}
//循环调用该方法,输出一条数据调用一次
override def invoke(prod: Product, context: SinkFunction.Context): Unit = {
//为占位符赋值
pstat.setString(1,prod.name)
pstat.setDouble(2,prod.price)
//执行插入操作
pstat.executeUpdate()
}
//生命周期管理方法,在Sink结束的时候调用
override def close(): Unit = {
if(pstat!=null){
pstat.close()
}
if(conn!=null){
conn.close()
}
}
}