大数据全系列 教程
1869个小节阅读:464.8k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速,为了统计实时的平均车速,这里设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量 ;并且在Flume采集数据的时候,我们发现数据可能出现时间乱序问题,最长迟到5秒。
实时卡口平均速度需要保存到Mysql数据库中,结果表设计为:
xxxxxxxxxx
DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`monitor_id` varchar(255) DEFAULT NULL,
`avg_speed` double DEFAULT NULL,
`car_count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
修改全局常量类GlobalConstant,追加如下内容:
xxxxxxxxxx
/**卡口的平均车速
* @param start 窗口起始时间
* @param end 窗口结束时间
* @param monitorId 卡口id
* @param avgSpeed 窗口内通过车辆的平均车速
* @param carCount 窗口内通过车辆数
*/
case class AvgSpeedInfo(start:Long,end:Long,monitorId:String,
avgSpeed:Double,carCount:Int)
代码实现计算结果并输出到控制台:
xxxxxxxxxx
package com.itbaizhan.traffic.monitor
import com.itbaizhan.traffic.util.{AvgSpeedInfo, TrafficLog}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.Properties
object AvgSpeedMonitor {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//设置并行度 推荐设置主题的分区相同
streamEnv.setParallelism(3)
//构建KafkaSource对象
val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]()
//设置kafka brokers
.setBootstrapServers("node2:9092,node3:9092,node4:9092")
//设置消费的主题
.setTopics("traffic-topic")
//设置消费者组
.setGroupId("gp3")
//设置最新消费偏移量
.setStartingOffsets(OffsetsInitializer.latest())
//设置返序列化类
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
//设置kafka数据源
val dataDS : DataStream[String] = streamEnv.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(),//暂不设置水位线
"KafkaSource")
//处理非法数据
val trafficLogDS: DataStream[TrafficLog] = dataDS.filter(_.split(",").length == 7)
//数据类型转换
.map(line => {
val arrs = line.split(",")
TrafficLog(arrs(0).toLong, arrs(1), arrs(2), arrs(3), arrs(4).toDouble, arrs(5), arrs(6))
})
.assignTimestampsAndWatermarks(
//数据出现乱序,一般不超过5秒
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[TrafficLog] {
override def extractTimestamp(element: TrafficLog, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.actionTime
}
})
)
//统计每个卡口的平均车速,故按照卡口id分组
val resultDS: DataStream[AvgSpeedInfo] = trafficLogDS.keyBy(_.monitorId)
//滑动窗口,窗口长度是为5分钟,滑动步长为1分钟
.window(SlidingProcessingTimeWindows.of(
Time.minutes(5), //size
Time.minutes(1))) //slide
/*Time.seconds(5),//size 测试使用秒
Time.seconds(1)))//slide*/
//首先需要统计每个卡口经过车辆的数量,和统计这些车的车速之和,可以使用增量函数
.aggregate(
//AggregateFunction<IN, ACC, OUT>
//累加器类型为二元组(累加车速之和,累加车辆数量)
new AggregateFunction[TrafficLog, (Double, Long), (Double, Long)] {
//创建计数器
override def createAccumulator(): (Double, Long) = (0.0, 0L)
//计数器计数
override def add(value: TrafficLog, acc: (Double, Long)): (Double, Long) = {
//速度累加,车数量+1
(acc._1 + value.speed, acc._2 + 1)
}
//获取计数器
override def getResult(acc: (Double, Long)): (Double, Long) = acc
//各分区聚合
override def merge(a: (Double, Long), b: (Double, Long)): (Double, Long) = {
(a._1 + b._1, a._2 + b._2)
}
},
//WindowFunction[IN, OUT, KEY, W <: Window]
new WindowFunction[(Double, Long), AvgSpeedInfo, String, TimeWindow] {
//全量函数 ,计算窗口内平均车速
override def apply(key: String, window: TimeWindow, input: Iterable[(Double, Long)], out: Collector[AvgSpeedInfo]): Unit = {
//获取窗口内累加器最终的结果
val tup = input.last
//求取平均速度
val avg: Double = (tup._1 / tup._2).formatted("%.2f").toDouble
out.collect(new AvgSpeedInfo(window.getStart, window.getEnd, key, avg, tup._2.toInt))
}
}
)
//测试输出计算结果
resultDS.printToErr().setParallelism(1)
streamEnv.execute()
}
}
接下来需要将结果写入到MySQL的表中,设置公用工具类:
xxxxxxxxxx
package com.itbaizhan.traffic.util
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement}
class JdbcWriterDataSink[T](classType:Class[_<:T]) extends RichSinkFunction[T]{
var conn :Connection =_
var pst:PreparedStatement =_
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.cj.jdbc.Driver")
conn =DriverManager.getConnection("jdbc:mysql://node1:3306/traffic_monitor","root","123456")
if(classType.getName.equals(classOf[AvgSpeedInfo].getName)){
pst = conn.prepareStatement("insert into t_average_speed (start_time,end_time,monitor_id,avg_speed,car_count) values (?,?,?,?,?)")
}//后续扩展
}
override def close(): Unit = {
pst.close()
conn.close()
}
override def invoke(value: T, context: SinkFunction.Context): Unit = {
if(classType.getName.equals(classOf[AvgSpeedInfo].getName)){
val info: AvgSpeedInfo = value.asInstanceOf[AvgSpeedInfo]
pst.setLong(1,info.start)
pst.setLong(2,info.end)
pst.setString(3,info.monitorId)
pst.setDouble(4,info.avgSpeed)
pst.setInt(5,info.carCount)
pst.executeUpdate()
}//后续扩展
}
}
修改AvgSpeedMonitor类:
xxxxxxxxxx
//resultDS.printToErr().setParallelism(1)
resultDS.addSink(new JdbcWriterDataSink[AvgSpeedInfo](classOf[AvgSpeedInfo]))
运行程序测试,记得启动Zookeeper集群和kafka集群以及运行CreateDataToKafka。