大数据全系列 教程
1869个小节阅读:464.8k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:通过实时计算,需要把所有超速超过20%的车辆找出来,并写入MySQL数据库
traffic_monitor.t_speeding_info
表中。
思路分析:
1.使用FlinkCDC读取MySQL的
traffic_monitor.t_monitor_info
的数据2.使用SQL Connector KafkaSource读取主题
traffic-topic
中的数据3.使用SQL DDL创建JDBC SInk表
4.关联查询并插入到sink表来实现写数据到MySQL的
traffic_monitor.t_speeding_info
表
添加依赖:
xxxxxxxxxx
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码如下:
xxxxxxxxxx
package com.itbaizhan.traffic.monitor
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object LimitSpeedMonitor {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//设置checkpoint
streamEnv.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
streamEnv.getCheckpointConfig.setCheckpointStorage("file:///E:/flink_checkpoint1");
//构建环境配置对象,指定流式处理
val settings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
//创建表环境对象
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
//TODO 1.使用FlinkCDC读取MySQL的traffic_monitor.t_monitor_info的数据
//创建输入表,使用CDC-SQL-MySQL获取数据
tableEnv.executeSql(
"CREATE TABLE tb_monitor_info (\n" +
" area_id STRING,\n" +
" road_id STRING,\n" +
" monitor_id STRING,\n" +
" speed_limit INT NOT NULL,\n" +
" PRIMARY KEY(area_id,road_id,monitor_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'node1',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'traffic_monitor',\n" +
" 'table-name' = 't_monitor_info',\n" +
" 'scan.startup.mode' = 'initial'\n" +
")"
)
//查询输出
//tableEnv.executeSql("select * from tb_monitor_info").print()
//TODO 2.使用SQL Connector KafkaSource读取主题traffic-topic中的数据
//使用SQL DDL创建Source表
tableEnv.executeSql("CREATE TABLE tb_traffic_log (\n" +
" `action_time` BIGINT,\n" +
" `monitor_id` STRING,\n" +
" `camera_id` STRING,\n" +
" `car` STRING,\n" +
" `speed` DOUBLE,\n" +
" `road_id` STRING,\n" +
" `area_id` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'traffic-topic',\n" +
" 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9092',\n" +
" 'properties.group.id' = 'gid1',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv',\n" +
" 'csv.field-delimiter' = ','\n" +
")"
)
//测试执行查询并输出
//tableEnv.executeSql("select * from tb_traffic_log").print()
//TODO 3.使用SQL DDL创建JDBC SInk表
tableEnv.executeSql("CREATE TABLE tb_speed_sink (\n" +
" `id` INT,\n" +
" `car` STRING,\n" +
" `monitor_id` STRING,\n" +
" `road_id` STRING,\n" +
" `real_speed` DOUBLE,\n" +
" `limit_speed` INT,\n" +
" `action_time` BIGINT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://node1:3306/traffic_monitor',\n" +
" 'table-name' = 't_speeding_info',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456'\n" +
")"
)
//TODO 4.关联查询并插入到sink表来实现写数据到MySQL的traffic_monitor.t_speeding_info表
tableEnv.executeSql(
" insert into tb_speed_sink" +
// 主键id指定0时会使用自增值
" select 0 as id,ttl.car,ttl.monitor_id,ttl.road_id,ttl.speed as real_speed,tmi.speed_limit as limit_speed,ttl.action_time" +
" from tb_monitor_info tmi,tb_traffic_log ttl" +
" where tmi.area_id = ttl.area_id" +
" and tmi.road_id = ttl.road_id" +
" and tmi.monitor_id = ttl.monitor_id" +
" and ttl.speed >= tmi.speed_limit*1.2")
}
}