大数据全系列 教程
1869个小节阅读:467.9k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:城市交通中,有些车辆需要实时轨迹跟踪,这些需要跟踪轨迹的车辆,保存在城市违法表中:t_violation_list。系统需要实时打印这些车辆经过的卡口,并且把轨迹数据插入数据表t_track_info(Hbase数据库)中。
车辆轨迹数据 car 车牌号码、actionTime 时间、monitorId 卡口号、
roadId 道路id、areaId 区域id、speed 时速
代码实现:
xxxxxxxxxx
package com.itbaizhan.traffic.distribution
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
//违法车辆轨迹跟踪
object RtCarTracker {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
//设置checkpoint
streamEnv.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
streamEnv.getCheckpointConfig.setCheckpointStorage("file:///E:/flink_checkpoint2");
//构建环境配置对象,指定流式处理
val settings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
//创建表环境对象
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
//TODO 1.使用FlinkCDC读取MySQL的traffic_monitor.t_violation_list
//创建输入表,使用CDC-SQL-MySQL获取数据
tableEnv.executeSql(
"CREATE TABLE tb_violation_list (\n" +
" id INT,\n" +
" car STRING,\n" +
" violation STRING,\n" +
" create_time BIGINT,\n" +
" detail STRING,\n" +
" PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'node1',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'traffic_monitor',\n" +
" 'table-name' = 't_violation_list',\n" +
" 'scan.startup.mode' = 'initial'\n" +
")"
)
//查询输出
//tableEnv.executeSql("select * from tb_violation_list").print()
//TODO 2.使用SQL Connector KafkaSource读取主题traffic-topic中的数据
//使用SQL DDL创建Source表
tableEnv.executeSql("CREATE TABLE tb_traffic_log (\n" +
" `action_time` BIGINT,\n" +
" `monitor_id` STRING,\n" +
" `camera_id` STRING,\n" +
" `car` STRING,\n" +
" `speed` DOUBLE,\n" +
" `road_id` STRING,\n" +
" `area_id` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'traffic-topic',\n" +
" 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9092',\n" +
" 'properties.group.id' = 'gid6',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv',\n" +
" 'csv.field-delimiter' = ','\n" +
")"
)
//测试执行查询并输出
//tableEnv.executeSql("select * from tb_traffic_log").print()
//TODO 3.使用SQL DDL创建JDBC SInk表
tableEnv.executeSql("CREATE TABLE tb_track_sink2 (\n" +
" `id` INT,\n" +
" `car` STRING,\n" +
" `action_time` BIGINT,\n" +
" `monitor_id` STRING,\n" +
" `road_id` STRING,\n" +
" `area_id` STRING,\n" +
" `speed` DOUBLE,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://node1:3306/traffic_monitor',\n" +
" 'table-name' = 't_track_info',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456'\n" +
")"
)
//TODO 4.关联查询并插入到sink表来实现写数据到MySQL的traffic_monitor.t_track_info
tableEnv.executeSql(
"insert into tb_track_sink" +
//自增主键指定0后,会自动生成对应的自增id
" select 0,ttl.car,ttl.action_time,ttl.monitor_id,ttl.road_id,ttl.area_id,ttl.speed" +
" from tb_traffic_log as ttl,tb_violation_list as tvl" +
" where ttl.car = tvl.car")
}