大数据全系列 教程
1869个小节阅读:467.8k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例演示:基于DataStream实现Flink CDC从MySQL数据库flink_db.tb_product表实时捕获增量数据,并打印在控制台。
添加Maven依赖:
xxxxxxxxxx
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
编码实现:
xxxxxxxxxx
package com.itbaizhan.flink.java.cdc;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CdcMySQLDataStreamDemo {
public static void main(String[] args) throws Exception {
//1.构建MySqlSource对象
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.20.101")
.port(3306)
.databaseList("flink_db") // set captured database
.tableList("flink_db.tb_product") // set captured table
.username("root")
.password("123456")
/**initial():首次启动时对监控的数据先全量,后增量
* earliest():只是从二进制binlog的开头读取。
* latest():只是从二进制binlog的结尾读取。
*/
.startupOptions(StartupOptions.initial())
// converts SourceRecord to JSON String
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
//2.构建执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置checkpoint
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///E:/flink_checkpoint1");
//3.添加数据源
DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
//...转换...
//4.输出
dataStreamSource.printToErr().setParallelism(1);
//5.执行
env.execute("CdcMySQLDataStreamDemo");
}
}
Bug解决办法:
首先依赖调整
xxxxxxxxxx
<!--<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
然后将软件\log4j2.xml
拷贝到当前项目的resources目录下。
INSERT INTO tb_product(id,NAME,price) VALUES(9,'macbook1',9999);
INSERT INTO tb_product(id,NAME,price) VALUES(10,'macbook2',8888);