大数据全系列 教程
1869个小节阅读:465k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:当监控到道路中有一起违法交通事故时,例如:车辆危险驾驶、车辆套牌、发生交通事故等,会有对应的交警出警处理案情。违法事故实时数据会通过node1:8888传入,交通警察出警记录通过node2:8888实时上报数据,这里需要对违法交通事故的出警情况进行分析并对超时未处理的警情作出对应的预警。
出警分析如下:
需要两种流,这里为了方便演示,将从socket中读取数据:
系统的实时违法车辆的数据流。
xxxxxxxxxx
/**违法车辆信息的样例类
* @param car 车牌号码
* @param msg 违法信息
* @param createTime 违法时间
*/
case class ViolationInfo(car:String,msg:String,createTime:Long)
交警实时出警记录的数据流。
xxxxxxxxxx
/**出警记录的样例类
* @param policeId 警察编号
* @param car 违法车牌号
* @param actionStatus 出警状态
* @param actionTime 事件时间
*/
case class PoliceAction(policeId:String,car:String,
actionStatus:String,actionTime:Long)
修改GlobalConstant类,添加全局常量:
xxxxxxxxxx
//违法车辆出警时间 单位毫秒 便于测试改为5秒
//val TIME_VIOLATION_CAR_AND_POLICE_ACTION=5000*60
val TIME_VIOLATION_CAR_AND_POLICE_ACTION=5000
使用IntervalJoin实现,这是只能输出违法车辆和对应的出警信息
xxxxxxxxxx
package com.itbaizhan.traffic.warning
import com.itbaizhan.traffic.util.{GlobalConstant, PoliceAction, ViolationInfo}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import java.time.Duration
object ViolationCarAndPoliceActionAnalysis {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
//TODO 1.获取违法车辆流数据
val stream1: DataStream[ViolationInfo] = streamEnv.socketTextStream("node1", 8888)
.map(line => {
val arr = line.trim.split(",")
new ViolationInfo(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(
WatermarkStrategy.noWatermarks()
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[ViolationInfo] {
override def extractTimestamp(element: ViolationInfo, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.createTime
}
})
)
//TODO 2.获取出警记录的流数据
val stream2: DataStream[PoliceAction] = streamEnv.socketTextStream("node2", 8888)
.map(line => {
val arr = line.trim.split(",")
new PoliceAction(arr(0), arr(1), arr(2),arr(3).toLong)
}).assignTimestampsAndWatermarks(
WatermarkStrategy.noWatermarks()
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[PoliceAction] {
override def extractTimestamp(element: PoliceAction, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//TODO 3.需要两个流的连接,连接条件ViolationInfo.car = PoliceAction.car
stream1.keyBy(_.car)
//intervalJoin类似SQL的内连接,只能应用在KeyedStream,不适用DataStream流的连接。
.intervalJoin(stream2.keyBy(_.car))
//5分钟都可以关联 ,时间可以包括边界.(便于测试改为5000毫秒)
//设置一个时间边界,在这个边界内,两个流的数据自动根据车牌号相同关联。
.between(Time.seconds(0),
Time.milliseconds(GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION))
.process(new ProcessJoinFunction[ViolationInfo,PoliceAction,String] {
override def processElement(left: ViolationInfo, right: PoliceAction, ctx: ProcessJoinFunction[ViolationInfo, PoliceAction, String]#Context, out: Collector[String]) = {
//已经关联成功过了
out.collect(s"违法车辆:${left.car},已经有交警出警了,警号为:${right.policeId},出警的状态是:${right.actionStatus},出警的时间:${right.actionTime}")
}
})
.print()
streamEnv.execute("违法车辆与出警分析")
}
}
违法车辆信息测试数据:
xxxxxxxxxx
京E32638,涉嫌套牌,1975710081000
京E32636,涉嫌套牌,1975710082000
京E32635,涉嫌套牌,1975710083000
出警记录测试数据:
xxxxxxxxxx
J0001,京E32638,处理完成,1975710085000
J0002,京E32636,处理完成,1975710089000
J0003,京NJA688,处理完成,1975710090000
目前只能输出违法车辆在5分钟内出警记录的数据(内连接)。升级程序:
xxxxxxxxxx
package com.itbaizhan.traffic.warning
import com.itbaizhan.traffic.util.{GlobalConstant, PoliceAction, ViolationInfo}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import java.time.Duration
/**1. 出现一条违法车辆信息,如果在5分钟内已经出警,将出警信息输出控制台。
2. 出现一条违法车辆信息,如果5分钟内没有出警则发出出警提示(在第一个侧流中发出)。
3. 有交警的出警记录,但是不是由监控平台报的警的数据(在第二个侧流中发出)
*/
object ViolationCarAndPoliceActionAnalysis2 {
//导入隐式转换
val secondTag =new OutputTag[ViolationInfo]("No Police Action")
val threeTag =new OutputTag[PoliceAction]("No Violation")
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//方便查看结果,并行度设置为1
streamEnv.setParallelism(1)
//TODO 1.获取违法车辆流数据
val stream1: DataStream[ViolationInfo] = streamEnv.socketTextStream("node1", 8888)
.map(line => {
val arr = line.trim.split(",")
new ViolationInfo(arr(0), arr(1), arr(2).toLong)
}).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[ViolationInfo] {
override def extractTimestamp(element: ViolationInfo, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.createTime
}
})
)
//TODO 2.获取出警记录的流数据
val stream2: DataStream[PoliceAction] = streamEnv.socketTextStream("node1", 9999)
.map(line => {
val arr = line.trim.split(",")
new PoliceAction(arr(0), arr(1), arr(2),arr(3).toLong)
}).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[PoliceAction] {
override def extractTimestamp(element: PoliceAction, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//TODO 3.同一个车牌号的两条不同类型的数据分到一组
val result: DataStream[String] = stream1.keyBy(_.car)
.connect(stream2.keyBy(_.car))
.process(new JoinStreamFunction)
result.getSideOutput(secondTag).print("第二种情况:")
result.getSideOutput(threeTag).print("第三种情况:")
//输出主流数据
result.print("main")
streamEnv.execute("违法车辆与出警分析2")
}
class JoinStreamFunction extends KeyedCoProcessFunction[String,ViolationInfo,PoliceAction,String]{
//需要两个状态保存,分别保存违法数据和出警记录
lazy val violationState:ValueState[ViolationInfo] =getRuntimeContext.getState(
new ValueStateDescriptor[ViolationInfo]("violationState",classOf[ViolationInfo]))
lazy val policeActionState:ValueState[PoliceAction] =getRuntimeContext.getState(
new ValueStateDescriptor[PoliceAction]("policeState",classOf[PoliceAction]))
//当违法数据流中有一条Violation数据进入,则自动调用该方法
override def processElement1(value: ViolationInfo,
ctx: KeyedCoProcessFunction[String, ViolationInfo, PoliceAction, String]#Context,
out: Collector[String]): Unit = {
println("processElement1->"+value)
//获取对应的出警记录状态数据
val policeAction = policeActionState.value()
if(policeAction==null){
//有违法的数据,但是没有与之对应的出警记录
//将违法车辆信息保存到状态中
violationState.update(value)
//注册一个触发器,默认在5秒后触发,除非,在5秒内删除当前触发器。
ctx.timerService().registerEventTimeTimer(
value.createTime+GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION)
println("processElement1 add timer:"+(value.createTime+GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION))
}else{//违法车辆同时存在出警记录
out.collect(s"1车辆${value.car},已经出警了,警号为:${policeAction.policeId}," +
s"出警状态为:${policeAction.actionStatus},出警时间是:${policeAction.actionTime}")
//删除出警记录定时器
ctx.timerService().deleteEventTimeTimer(policeAction.actionTime
+ GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION)
println("processElement1 delete timer:"+(policeAction.actionTime+GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION))
//清除出警记录对应的状态数据
policeActionState.clear()
}
}
//当警察出警数据流中有一条PoliceAction数据进入,则自动调用该方法
override def processElement2(value: PoliceAction,
ctx: KeyedCoProcessFunction[String, ViolationInfo, PoliceAction, String]#Context,
out: Collector[String]): Unit = {
println("processElement2->"+value)
//获取相同车牌号的违法信息状态数据
val vioInfo: ViolationInfo = violationState.value()
if(vioInfo==null){
//有出警记录的数据,但是没有与之对应的违法车辆信息
//将出警记录信息保存到状态中
policeActionState.update(value)
//注册一个触发器,默认在5秒后触发,除非,在5秒内删除当前触发器。
ctx.timerService().registerEventTimeTimer(
value.actionTime+GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION)
println("processElement2 add timer:"+(value.actionTime+GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION))
}else{
out.collect(s"2车辆${vioInfo.car},已经出警了,警号为:${value.policeId}," +
s"出警状态为:${value.actionStatus},出警时间是:${value.actionTime}")
//删除违法记录定时器
ctx.timerService().deleteEventTimeTimer(vioInfo.createTime+
GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION)
println("processElement2 delete timer:"+(vioInfo.createTime+GlobalConstant.TIME_VIOLATION_CAR_AND_POLICE_ACTION))
//清除违法车辆对应的状态
violationState.clear()
}
}
//定时器到时间触发该方法
override def onTimer(timestamp: Long,
ctx: KeyedCoProcessFunction[String, ViolationInfo, PoliceAction, String]#OnTimerContext,
out: Collector[String]): Unit = {
println("ctx触发器开始触发timestamp:"+timestamp)
//获取状态中的数据
val vi = violationState.value()
val pai = policeActionState.value()
//有违法车辆信息,过了指定时间也没有出现对应出警信息
if(vi!=null){
ctx.output(secondTag,vi)
}
//有出警的记录,但是违法信息不是平台输出
if(pai!=null){
ctx.output(threeTag,pai)
}
//清空状态
violationState.clear()
policeActionState.clear()
}
}
}
违法车辆信息测试数据:
xxxxxxxxxx
京E32638,涉嫌套牌,1893427261000
京E32636,涉嫌套牌,1893427266000
京P32634,涉嫌套牌,1893427276000
京P32633,涉嫌套牌,1893427280000
京P32632,涉嫌套牌,1893427285000
京P32631,涉嫌套牌,1893427296000
出警记录测试数据:
xxxxxxxxxx
J0001,京E32638,处理完成,1893427265000
J0002,京NJA68X,处理完成,1893427271000
J0003,京E32636,处理完成,1893427277000
J0004,京NJA688,处理完成,1893427281000
J0005,京NJA681,处理完成,1893427283000
J0006,京NJA680,处理完成,1893427290000
扩展:违章信息来至于traffic-topicA,出警信息来至于traffic-topicB
创建主题:
xxxxxxxxxx
kafka-topics.sh --bootstrap-server node3:9092 --replication-factor 2 --partitions 3 --create --topic traffic-topicA
kafka-topics.sh --bootstrap-server node3:9092 --replication-factor 2 --partitions 3 --create --topic traffic-topicB
删除主题:
xxxxxxxxxx
kafka-topics.sh --bootstrap-server node3:9092 --delete --topic traffic-topicA
kafka-topics.sh --bootstrap-server node3:9092 --delete --topic traffic-topicB
生产数据:
xxxxxxxxxx
kafka-console-producer.sh --bootstrap-server node3:9092 --topic traffic-topicA
xxxxxxxxxx
kafka-console-producer.sh --bootstrap-server node3:9092 --topic traffic-topicB
代码参考:ViolationCarAndPoliceActionAnalysis3