大数据全系列 教程
1869个小节阅读:467.5k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:从MySQL中实时加载数据,要求每隔10秒钟查询flinkdb.tb_product表中的最新全量数据。继承RichSourceFunction
(多功能)来自定义无并行度或继承RichParallelSourceFunction
(多功能)来自定义有并行度。
创建数据flink_db
创建表product
xxxxxxxxxx
create table `flink_db`.`tb_product`(
`id` int NOT NULL AUTO_INCREMENT ,
`name` varchar(255) NOT NULL ,
`price` double ,
PRIMARY KEY (`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
准备插入语句
xxxxxxxxxx
#先插入一条
INSERT INTO flink_db.tb_product(`name`,`price`) VALUES('iPhone14Pro',8899);
#运行程序后,每隔10秒插入一条
INSERT INTO flink_db.tb_product(`name`,`price`) VALUES('iPhone14',5899);
INSERT INTO flink_db.tb_product(`name`,`price`) VALUES('Meta 50',6799);
添加依赖
xxxxxxxxxx
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
编写商品实体类和自定义Source类
xxxxxxxxxx
//定义商品实体类
case class Product(id:Int,name:String,price:Double)
class MySQLSource extends RichSourceFunction[Product]{
//是否生成数据的标志
private var flag = true
//在运行run方法之前初始化操作,比如获取连接
override def open(parameters: Configuration): Unit = super.open(parameters)
override def run(sourceContext: SourceFunction.SourceContext[Product]): Unit = {
while(flag){
}
}
//当将Job作业取消时调用,停止run中的循环
override def cancel(): Unit = {
flag = false
}
//关闭数据库连接
override def close(): Unit = super.close()
}
编码实现
xxxxxxxxxx
package com.itbaizhan.flink.scala.source
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.util.concurrent.TimeUnit
//定义商品实体类
case class Product(id:Int,name:String,price:Double)
class MySQLSource extends RichSourceFunction[Product]{
//是否生成数据的标志
private var flag = true
private var conn:Connection = _
private var pstat:PreparedStatement = _
private var resultSet:ResultSet = _
//在运行run方法之前初始化操作,比如获取连接
override def open(parameters: Configuration): Unit = {
//加载驱动
Class.forName("com.mysql.cj.jdbc.Driver")
//获取连接
conn = DriverManager.getConnection("jdbc:mysql://node1:3306/flink_db?useSSL=false","root","123456")
//预编译sql语句并构建pstat对象
pstat = conn.prepareStatement("select id,name,price from tb_product")
}
override def run(sourceContext: SourceFunction.SourceContext[Product]): Unit = {
while(flag){
//执行查询
resultSet = pstat.executeQuery()
//获取数据
while(resultSet.next()){
var product:Product = Product(resultSet.getInt("id"),
resultSet.getString("name"),
resultSet.getDouble("price"))
//发送数据
sourceContext.collect(product)
}
//每个4秒加载一次数据
TimeUnit.SECONDS.sleep(4)
}
}
//当将Job作业取消时调用,停止run中的循环
override def cancel(): Unit = {
flag = false
}
//关闭数据库连接
override def close(): Unit = {
if(resultSet!=null){
resultSet.close()
}
if(pstat!=null){
pstat.close()
}
if(conn!=null){
conn.close()
}
}
}
object DefinedMySqlSourceDemo {
def main(args: Array[String]): Unit = {
//1.构建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.数据源
import org.apache.flink.api.scala._
val prodDS: DataStream[Product] = env.addSource(new MySQLSource())
//3.数据转换
//4.数据sink
prodDS.print()
//5.触发执行
env.execute("mysql def source")
}
}