大数据全系列 教程
1869个小节阅读:464.9k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:当某个卡口中出现一辆行驶的汽车,我们可以通过摄像头识别车牌号,然后在10秒内,另外一个卡口(或者当前卡口)也识别到了同样车牌的车辆,那么就认定这两辆车之中有涉嫌套牌车,因为一般情况下不可能有车辆在10秒内经过两个卡口。如果发现涉嫌套牌车,系统实时发出报警信息,同时这些存在套牌车嫌疑的车辆,写入Mysql数据库的结果表中,在后面的模块中,可以对这些违法车辆进行实时轨迹跟踪。
思路:
按照车牌进行分组,使用键控状态为每个车牌保存经过卡口的数据,比较两次的action_time是否小于10s,则将数据写入MySQL数据库中。
代码实现:
修改全局常量类GlobalConstant,添加样例类:
xxxxxxxxxx
/**套牌车告警信息类
* @param car 车牌号码
* @param firstMonitor 经过卡口号1
* @param secondMonitor 经过卡口号2
* @param warningTime 警告时间
* @param warnMsg 警告信息
*/
case class RepetitionCarWarningInfo(car:String,
firstMonitor:String,secondMonitor:String,warningTime:Long,
warnMsg:String)
由于处理结果需要写入mysql的traffic_monitor.t_violation_list
,需要扩展JdbcWriterDataSink类
xxxxxxxxxx
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.cj.jdbc.Driver")
conn =DriverManager.getConnection("jdbc:mysql://node1:3306/traffic_monitor","root","123456")
if(classType.getName.equals(classOf[AvgSpeedInfo].getName)){
......
} if(classType.getName.equals(classOf[RepetitionCarWarningInfo].getName)){
pst = conn.prepareStatement("insert into t_violation_list (car,violation,create_time) values (?,?,?)")
}//后续扩展
}
override def invoke(value: T, context: SinkFunction.Context): Unit = {
if(classType.getName.equals(classOf[AvgSpeedInfo].getName)){
......
} if(classType.getName.equals(classOf[RepetitionCarWarningInfo].getName)){
val info: RepetitionCarWarningInfo = value.asInstanceOf[RepetitionCarWarningInfo]
pst.setString(1,info.car)
pst.setString(2,info.warnMsg)
pst.setLong(3,info.warningTime)
pst.executeUpdate()
}//后续扩展
}
读取Kafka数据,进行分析,并将涉嫌套牌车辆写入数据库,代码实现:
xxxxxxxxxx
package com.itbaizhan.traffic.warning
import com.itbaizhan.traffic.util.{AvgSpeedInfo, JdbcWriterDataSink, RepetitionCarWarningInfo, TrafficLog}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.time.Duration
object RepetitionCarAnalysis {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//设置并行度 推荐设置主题的分区相同
streamEnv.setParallelism(3)
//构建KafkaSource对象
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
//设置kafka brokers
.setBootstrapServers("node2:9092,node3:9092,node4:9092")
//设置消费的主题
.setTopics("traffic-topic")
//设置消费者组
.setGroupId("gp4")
//设置最新消费偏移量
.setStartingOffsets(OffsetsInitializer.latest())
//设置返序列化类
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
//设置kafka数据源
val dataDS : DataStream[String] = streamEnv.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(),//暂不设置水位线
"KafkaSource")
//处理非法数据
val trafficLogDS: DataStream[TrafficLog] = dataDS.filter(_.split(",").length == 7)
//数据类型转换
.map(line => {
val arrs = line.split(",")
TrafficLog(arrs(0).toLong, arrs(1), arrs(2), arrs(3), arrs(4).toDouble, arrs(5), arrs(6))
})
.assignTimestampsAndWatermarks(
//数据出现乱序,一般不超过5秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[TrafficLog] {
override def extractTimestamp(element: TrafficLog, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//统计每个车牌
val resultDS: DataStream[RepetitionCarWarningInfo] = trafficLogDS.keyBy(_.car)
//<K, I, O> key类型,输入元素类型,输出元素类型
.process(new KeyedProcessFunction[String, TrafficLog, RepetitionCarWarningInfo] {
//lazy懒加载用到时在定义
lazy private val firstVS: ValueState[TrafficLog] = getRuntimeContext.getState(
new ValueStateDescriptor[TrafficLog]("first", classOf[TrafficLog]))
//分组后,一条日志调用一次该方法
override def processElement(value: TrafficLog,
ctx: KeyedProcessFunction[String, TrafficLog, RepetitionCarWarningInfo]#Context,
out: Collector[RepetitionCarWarningInfo]): Unit = {
//获取当前组key对应的状态数据
val tlogState: TrafficLog = firstVS.value()
if (tlogState == null) {
//表示当前数据就是第一次经过卡口的数据,将之保存到状态中
firstVS.update(value)
} else {
//表示该车辆已经在某个卡口中出现过了,需要判断一个时间差
val firstTime = tlogState.actionTime
val secondTime = value.actionTime
//计算两个时间差的秒数
val less = (secondTime - firstTime).abs / 1000
if (less < 10) { //时间差小于10秒,涉嫌套牌
val info = new RepetitionCarWarningInfo(value.car, tlogState.monitorId, value.monitorId,
ctx.timerService().currentProcessingTime(), "涉嫌套牌车")
//将封装的数据对象传递给下游
out.collect(info)
//清空当前key对应的状态
firstVS.clear()
} else { //暂时不是套牌车,但是还需要后面的数据判断
//如果当前value.actionTime大,则更新状态
if (secondTime > firstTime) {
firstVS.update(value)
}
}
}
}
})
//测试输出
//resultDS.print().setParallelism(1)
resultDS.addSink(new JdbcWriterDataSink[RepetitionCarWarningInfo](classOf[RepetitionCarWarningInfo]))
streamEnv.execute()
}
}