大数据全系列 教程
1869个小节阅读:467k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例演示:基于Flink SQL实现Flink CDC从MySQL数据库flink_db.tb_product表实时获取增量数据,打印控制台。
代码:
xxxxxxxxxx
package com.itbaizhan.flink.java.cdc;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CdcSQLDemo {
public static void main(String[] args) {
//1.创建执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置checkpoint
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///E:/flink_checkpoint2");
//3.设置并行度
env.setParallelism(1);
//4.构建环境配置对象,指定流式处理
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
//5.创建表环境对象
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//6.创建输入表,使用CDC-SQL-MySQL获取数据
tableEnv.executeSql(
"CREATE TABLE tb_binlog (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" price DOUBLE,\n" +
" PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'node1',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'flink_db',\n" +
" 'table-name' = 'tb_product',\n" +
" 'scan.startup.mode' = 'initial'\n" +
")"
);
//7.查询数据
TableResult tableResult = tableEnv.executeSql("select id,name,price from tb_binlog");
//8.输出
tableResult.print();
}
}
运行程序后查看控制台。
插入数据后再次查看控制台:
INSERT INTO tb_product(id,NAME,price) VALUES(11,'macbook3',9998);
INSERT INTO tb_product(id,NAME,price) VALUES(12,'macbook4',8886);