大数据全系列 教程
1869个小节阅读:468k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:实时车辆分布情况,每隔5秒统计最近1分钟时间内整个城市中每个区分布多少量车。这里要注意车辆的去重,因为在1分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。
xxxxxxxxxx
package com.itbaizhan.traffic.distribution
import com.itbaizhan.traffic.util.TrafficLog
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.time.Duration
object CarAreaDistribution {
def main(args: Array[String]): Unit = {
//TODO 1.从kafka读取日志数据
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//设置并行度 推荐设置主题的分区相同
streamEnv.setParallelism(1)
//构建KafkaSource对象
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
//设置kafka brokers
.setBootstrapServers("node2:9092,node3:9092,node4:9092")
//设置消费的主题
.setTopics("traffic-topic")
//设置消费者组
.setGroupId("gp6")
//设置消费偏移量:latest
.setStartingOffsets(OffsetsInitializer.latest())
//设置返序列化类
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
//设置kafka数据源
val dataDS : DataStream[String] = streamEnv.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(),//暂不设置水位线
"KafkaSource")
//处理非法数据和设置按照事件时间进行处理
val trafficLogDS: DataStream[TrafficLog] = dataDS.filter(_.split(",").length == 7)
//数据类型转换
.map(line => {
val arrs = line.split(",")
TrafficLog(arrs(0).toLong, arrs(1), arrs(2), arrs(3), arrs(4).toDouble, arrs(5), arrs(6))
})
.assignTimestampsAndWatermarks(
//数据出现乱序,一般不超过2秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[TrafficLog] {
override def extractTimestamp(element: TrafficLog, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//按照区域进行分组
val keyedStream: KeyedStream[TrafficLog, String] = trafficLogDS.keyBy(_.areaId)
//每隔5秒统计最近1分钟(测试10秒)时间内整个城市中每个区分布多少量车
keyedStream.window(SlidingProcessingTimeWindows.of(
Time.seconds(10),Time.seconds(5)))
//[IN, OUT, KEY, W <: Window]
.process(new ProcessWindowFunction[TrafficLog,String,String,TimeWindow] {
//定义时间转化类
private val fdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def process(key: String, context: Context,
elements: Iterable[TrafficLog], out: Collector[String]): Unit = {
//获取窗口的开始时间
val startTime:String = this.fdf.format(context.window.getStart)
//获取窗口的结束时间
val endTIme:String = this.fdf.format(context.window.getEnd)
//利用set可以去重的特点
val set = scala.collection.mutable.Set[String]()
for(elem <- elements){
set.add(elem.car)
}
out.collect(s"开始时间:${startTime} - 结束时间:${endTIme},区域ID:${key},车辆总数 = ${set.size}")
}
}).print()
streamEnv.execute("车辆分布情况")
}
}