大数据全系列 教程
1869个小节阅读:465k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:危险驾驶是指在道路上驾驶机动车,追逐超速竞驶。我们规定:如果一辆机动车在2分钟内,超速通过卡口超过3次以上;而且每次超速的超过了规定速度的20%以上;这样的机动车涉嫌危险驾驶。系统需要实时找出这些机动车,并报警,追踪这些车辆的轨迹。注意:如果有些卡口没有设置限速值,可以设置一个城市默认限速60。
思路分析:
这样的需求在Flink也是有两种解决思路,第一:状态编程。第二:CEP编程。但是当前的需求使用状态编程过于复杂了。所以我们采用第二种。同时还要注意:Flume在采集数据的过程中出现了数据乱序问题,一般最长延迟5秒。
涉嫌危险驾驶的车辆信息保存到MySQL数据库表
t_violation_list
中,以便后面的功能中统一追踪这些车辆的轨迹。
代码实现:
修改全局常量类GlobalConstant,添加样例类:
xxxxxxxxxx
/**车辆危险驾驶的信息
* @param car 车牌号码
* @param msg 警告信息
* @param createTime 警告时间
* @param avgSpeed 平均速度
*/
case class DangeroursDrivingInfo(car:String,msg:String,
createTime:Long,avgSpeed:Double)
编码
xxxxxxxxxx
package com.itbaizhan.traffic.warning
import com.itbaizhan.traffic.util.{DangeroursDrivingInfo, JdbcWriterDataSink, MonitorLimitInfo, OutOfLimitSpeedInfo, TrafficLog}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import java.sql.DriverManager
import java.time.Duration
import scala.collection.Map
/**
* 危险驾驶分析
*/
object DangeroursDrivingAnalysis {
def main(args: Array[String]): Unit = {
//TODO 1.从kafka读取日志数据
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//设置并行度 推荐设置主题的分区相同
streamEnv.setParallelism(1)
//构建KafkaSource对象
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
//设置kafka brokers
.setBootstrapServers("node2:9092,node3:9092,node4:9092")
//设置消费的主题
.setTopics("traffic-topic")
//设置消费者组
.setGroupId("gp5")
//设置消费偏移量:earliest
.setStartingOffsets(OffsetsInitializer.earliest())
//设置返序列化类
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
//设置kafka数据源
val dataDS : DataStream[String] = streamEnv.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(),//暂不设置水位线
"KafkaSource")
//处理非法数据
val trafficLogDS: DataStream[TrafficLog] = dataDS.filter(_.split(",").length == 7)
//数据类型转换
.map(line => {
val arrs = line.split(",")
TrafficLog(arrs(0).toLong, arrs(1), arrs(2), arrs(3), arrs(4).toDouble, arrs(5), arrs(6))
})
.assignTimestampsAndWatermarks(
//数据出现乱序,一般不超过5秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[TrafficLog] {
override def extractTimestamp(element: TrafficLog, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//TODO 2.转化数据流 TrafficLog->OutOfLimitSpeedInfo 包含每个卡口的限速,方便后续CEP匹配
val stream: DataStream[OutOfLimitSpeedInfo] = trafficLogDS.map(new OutOfSpeedFunction(60) )
//TODO 3.使用CEP模式匹配
//2、定义模式
val pattern: Pattern[OutOfLimitSpeedInfo, OutOfLimitSpeedInfo] = Pattern.begin[OutOfLimitSpeedInfo]("start")
.where(t => {
t.limitSpeed * 1.2 < t.realSpeed //超速20%
})
.timesOrMore(3) //超速三次以上,包括三次则就是危险假设
.greedy //在两分钟内,使用贪婪模式: 在2分钟内尽可能匹配更多次超速
//.within(Time.minutes(2))
.within(Time.seconds(10))//测试时可以改为10秒
//3、在数据流中匹配得到检测流
val ps: PatternStream[OutOfLimitSpeedInfo] = CEP.pattern(stream.keyBy(_.car),pattern)
//4、选择结果
ps.select(
//map代表两分钟匹配的所有结果,根据模式定义的里面的名字来确定map集合的size
// 当前代码中,map集合只有一条数据,key:start value:迭代器中会有多条数据
(map: Map[String, Iterable[OutOfLimitSpeedInfo]] )=> {
val list: List[OutOfLimitSpeedInfo] = map.get("start").get.toList
//既然是危险假设,这辆车的平均速度肯定很高,需要计算平均车速
var sum:Double = 0.0
var count = list.size
var sb = new StringBuilder
sb.append(s"当前车辆${list(0).car},涉嫌危险驾驶,它在两分钟内经过的卡口数是:${list.size},")
for (i<- 0 until list.size){
var info =list(i)
sum +=info.realSpeed
sb.append(s"第${i+1}个卡口${info.monitorId},车速为:${info.realSpeed},该卡口的限速为:${info.limitSpeed},")
}
var avg:Double = (sum/count).formatted("%.2f").toDouble
sb.append(s"它在两分钟内的平均车速为:${avg}")
new DangeroursDrivingInfo(list(0).car,sb.toString(),list.last.actionTime,avg)
}
)//.print()//测试
.addSink(new JdbcWriterDataSink[DangeroursDrivingInfo](classOf[DangeroursDrivingInfo]))
streamEnv.execute()
}
class OutOfSpeedFunction(baseLimitSpeed:Int) extends RichMapFunction[TrafficLog,OutOfLimitSpeedInfo]{
var map =scala.collection.mutable.Map[String,MonitorLimitInfo]() //map只用于判断是否超速的临时变量
override def open(parameters: Configuration): Unit = {
//把数据库所有的限速信息读取到Map集合中
var conn =DriverManager.getConnection("jdbc:mysql://node1:3306/traffic_monitor","root","123456")
var pst = conn.prepareStatement("SELECT monitor_id,road_id,speed_limit,area_id FROM t_monitor_info WHERE speed_limit>0")
var ret = pst.executeQuery()
while (ret.next()){
var info = new MonitorLimitInfo(ret.getString(1),
ret.getString(2),
ret.getInt(3),
ret.getString(4))
map.put(info.monitorId,info)
}
ret.close()
pst.close()
conn.close()
}
override def map(value: TrafficLog): OutOfLimitSpeedInfo = {
//如果集合中有当前卡口的限速,判断是否超速20%,如果集合中没有当前卡口的限速,城市道路中有一个基本限速60
val info: MonitorLimitInfo = map.getOrElse(value.monitorId,new MonitorLimitInfo(value.monitorId,value.roadId,baseLimitSpeed,value.areaId))
//封装结果数据对象
new OutOfLimitSpeedInfo(value.car,value.monitorId,value.roadId,value.speed,info.speedLimit,value.actionTime)
}
}
}