大数据全系列 教程
1869个小节阅读:466.8k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:Flink 读取自定义数据源,使用EventTime,每隔5s 统计一次基站的通话总时长。
producer:sid_1,success,2030-12-28 16:28:41,4 producer:sid_1,success,2030-12-28 16:28:42,3 producer:sid_1,success,2030-12-28 16:28:43,8 producer:sid_1,success,2030-12-28 16:28:44,3 producer:sid_1,success,2030-12-28 16:28:45,7 window:[2030-12-28 16:28:40-2030-12-28 16:28:45]-<sid_1>--总时长:18 producer:sid_1,success,2030-12-28 16:28:46,2 producer:sid_1,success,2030-12-28 16:28:47,1 producer:sid_1,success,2030-12-28 16:28:48,1 producer:sid_1,success,2030-12-28 16:28:49,5 producer:sid_1,success,2030-12-28 16:28:50,6 window:[2030-12-28 16:28:45-2030-12-28 16:28:50]-<sid_1>--总时长:16
注意:基于事件时间EventTime窗口分析时,要求数据字段中,必须包含事件时间字段,代表数据产生时间。
com.itbaizhan.flink.scala.time包下构建StationLog和StationLogProducer
xxxxxxxxxx
package com.itbaizhan.flink.scala.time
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
/** 通信基站日志数据
*
* @param sid 基站ID
* @param callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success):
* @param callTime 呼叫时间戳,精确到毫秒
* @Param duration 通话时长 单位:秒
*/
case class Station(sid:String,callType:String,callTime:Long,duration:Long){
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def toString: String = {
sid+","+callType+","+this.format.format(callTime)+","+duration
}
}
class StationProducer extends SourceFunction[Station]{
//是否继续生产数据
private var isMakeData = true
private val random = new Random()
override def run(sourceContext: SourceFunction.SourceContext[Station]): Unit = {
while(isMakeData){
//方便后续测试,每秒想产生两条数据:1.to(2).map(i=>{
1.to(1).map(i=>{
var callTime = System.currentTimeMillis()
val sl = Station("sid_"+i,"success",callTime,1+random.nextInt(9))
println("producer:"+sl)
sl
}).foreach(ele=>{//给下一个任务算子发送数据
sourceContext.collect(ele)
})
Thread.sleep(1000)
}
}
//省略写法,与上面注释掉的cancel方法等价
override def cancel(): Unit = isMakeData = false
}
EventTimeWindowDemo类
xxxxxxxxxx
package com.itbaizhan.flink.scala.time
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.time.Duration
object EventTimeWindowDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//使用自定义数据源
val stationDS: DataStream[Station] = env.addSource(new StationProducer)
stationDS.filter(_.callType.equals("success"))
.assignTimestampsAndWatermarks(
WatermarkStrategy
//不考虑数据乱序和延迟的问题
.forBoundedOutOfOrderness(Duration.ofSeconds(0))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[Station] {
//element:流中一条数据对象
override def extractTimestamp(element: Station, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.callTime
}
})
).keyBy(_.sid)//按照基站id分组
//设置窗口:事件时间窗口,并且是滚动窗口,windowSize = 5s
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//sid_1,success,2030-12-29 16:04:43,10
//无法显示窗口的起止时间,继续优化代码
//.sum("duration").print()
//window:[2030-12-29 16:11:50--2030-12-29 16:11:55]-<sid_1>--总时长:15
//IN, OUT, KEY, W <: Window]
.apply(new WindowFunction[Station,String,String,TimeWindow] {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def apply(key: String, window: TimeWindow, input: Iterable[Station], out: Collector[String]): Unit = {
//获取窗体的开始结束时间
val start = this.format.format(window.getStart)
val end = this.format.format(window.getEnd)
//对窗口中通话时长求和
var sum:Long = 0L
//遍历求和
for(ele<-input.iterator){
sum += ele.duration
}
out.collect("window:["+start+"--"+end+"]-<"+key+">--总时长:"+sum)
}
}).print()
env.execute("EventTimeWindowDemo")
}
}