大数据全系列 教程
1869个小节阅读:464.9k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:计算每分钟内平均车速排名前3名的卡口信息,并将排名结果格式化成字符串,便于后续输出。
代码实现:
xxxxxxxxxx
package com.itbaizhan.traffic.monitor
import com.itbaizhan.traffic.util.{AvgSpeedInfo, TrafficLog}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.lang
import java.time.Duration
object TopNAvgSpeedMonitor {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//设置并行度 推荐设置主题的分区相同
streamEnv.setParallelism(3)
//构建KafkaSource对象
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
//设置kafka brokers
.setBootstrapServers("node2:9092,node3:9092,node4:9092")
//设置消费的主题
.setTopics("traffic-topic")
//设置消费者组
.setGroupId("gp3")
//设置最新消费偏移量
.setStartingOffsets(OffsetsInitializer.latest())
//设置返序列化类
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
//设置kafka数据源
val dataDS : DataStream[String] = streamEnv.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(),//暂不设置水位线
"KafkaSource")
//处理非法数据
val trafficLogDS: DataStream[TrafficLog] = dataDS.filter(_.split(",").length == 7)
//数据类型转换
.map(line => {
val arrs = line.split(",")
TrafficLog(arrs(0).toLong, arrs(1), arrs(2), arrs(3), arrs(4).toDouble, arrs(5), arrs(6))
})
.assignTimestampsAndWatermarks(
//数据出现乱序,一般不超过5秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[TrafficLog] {
override def extractTimestamp(element: TrafficLog, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//统计每个卡口的平均车速,故按照卡口id分组
val avgDS: DataStream[AvgSpeedInfo] = trafficLogDS.keyBy(_.monitorId)
//滑动窗口,窗口长度是为5分钟,滑动步长为1分钟
.window(SlidingProcessingTimeWindows.of(
/*Time.minutes(5), //size
Time.minutes(1))) //slide*/
Time.seconds(5),//size 测试使用秒
Time.seconds(1)))//slide
//首先需要统计每个卡口经过车辆的数量,和统计这些车的车速之和,可以使用增量函数
.aggregate(
//AggregateFunction<IN, ACC, OUT>
//累加器类型为二元组(累加车速之和,累加车辆数量)
new AggregateFunction[TrafficLog, (Double, Long), (Double, Long)] {
//创建计数器
override def createAccumulator(): (Double, Long) = (0.0, 0L)
//计数器计数
override def add(value: TrafficLog, acc: (Double, Long)): (Double, Long) = {
//速度累加,车数量+1
(acc._1 + value.speed, acc._2 + 1)
}
//获取计数器
override def getResult(acc: (Double, Long)): (Double, Long) = acc
//各分区聚合
override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
(a._1 + b._1, a._2 + b._2)
}
},
//WindowFunction[IN, OUT, KEY, W <: Window]
new WindowFunction[(Double, Long), AvgSpeedInfo, String, TimeWindow] {
//全量函数 ,计算窗口内平均车速
override def apply(key: String, window: TimeWindow, input: Iterable[(Double, Long)], out: Collector[AvgSpeedInfo]): Unit = {
//获取窗口内累加器最终的结果
val tup = input.last
//求取平均速度
val avg: Double = (tup._1 / tup._2).formatted("%.2f").toDouble
out.collect(new AvgSpeedInfo(window.getStart, window.getEnd, key, avg, tup._2.toInt))
}
}
)
//数据是有序的,只要设定EventTime
avgDS.assignAscendingTimestamps(_.end)
//统计1分钟内的TopN
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
//<IN, OUT, W extends Window>
.apply(new AllWindowFunction[AvgSpeedInfo,String,TimeWindow]{
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
//input里面有n条
override def apply(window: TimeWindow, input: Iterable[AvgSpeedInfo], out: Collector[String]): Unit = {
//存放所有卡口的平均车速
val map = scala.collection.mutable.Map[String, AvgSpeedInfo]()
//遍历input
for(one<-input){
//如果不存在当前卡口的数据则直接添加
if(!map.contains(one.monitorId)){
map.put(one.monitorId,one)
}else{
//如果存在当前卡口的数据,则保留大的
//val maybeInfo: Option[AvgSpeedInfo] = map.get(one.monitorId)
if(one.avgSpeed>map.get(one.monitorId).get.avgSpeed){
map.put(one.monitorId,one)
}
}
}
//map集合中没有重复的数据,开始按照平均速度降序排序,并获取前三名
val list: List[AvgSpeedInfo] = map.values.toList
.sortBy(_.avgSpeed)(Ordering.Double.reverse)
.take(3)
//拼接结果
val msg = new StringBuilder()
val startTime = format.format(window.getStart)
val endTime = format.format(window.getEnd)
msg.append(s"起止时间[${startTime},${endTime}]内,平均车速最高的三个卡口:\n")
list.foreach(avgSI=>{
msg.append(s"(卡口编号:${avgSI.monitorId},平均车速:${avgSI.avgSpeed},经过车辆数:${avgSI.carCount})")
msg.append("\n")
})
out.collect(msg.toString())
}
}).print().setParallelism(1)
streamEnv.execute()
}
}