大数据全系列 教程
1869个小节阅读:468.1k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
默认情况下,当watermark超过end-of-window之后,再有之前的数据到达时,这些数据会被删除。为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。
简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。
延迟数据
是指:在当前窗口【假设窗口范围为00 - 05】已经计算之后,又来了一个属于该窗口的数据【假设事件时间为04】,这时候仍会触发window操作
,这种数据就称为延迟数据。
Allowed Lateness 机制允许用户设置一个允许的最大迟到时⻓。
案例演示:
Watermark为0forBoundedOutOfOrderness(Duration.ofSeconds(0))
,并且allowedLateness为0
[root@node3 ~]# nc -lk 8888 sid_1,success,2030-12-01 10:10:01,1 sid_1,success,2030-12-01 10:10:02,1 sid_1,success,2030-12-01 10:10:03,1 sid_1,success,2030-12-01 10:10:04,1 sid_1,success,2030-12-01 10:10:05,1
window:[2030-12-01 10:10:00--2030-12-01 10:10:05]-<sid_1>--总时长:4
sid_1,success,2030-12-01 10:10:04.989,1
sid_1,success,2030-12-01 10:10:08,1 sid_1,success,2030-12-01 10:10:10,1window:[2030-12-01 10:10:05--2030-12-01 10:10:10]-<sid_1>--总时长:2
Watermark为0forBoundedOutOfOrderness(Duration.ofSeconds(0))
,
allowedLateness为2allowedLateness(Time.seconds(2))
[root@node3 ~]# nc -lk 8888 sid_1,success,2030-12-01 10:10:01,1 sid_1,success,2030-12-01 10:10:02,1 sid_1,success,2030-12-01 10:10:03,1 sid_1,success,2030-12-01 10:10:04,1 sid_1,success,2030-12-01 10:10:05,1
window:[2030-12-01 10:10:00--2030-12-01 10:10:05]-<sid_1>--总时长:4
sid_1,success,2030-12-01 10:10:04.456,1
window:[2030-12-01 10:10:00--2030-12-01 10:10:05]-<sid_1>--总时长:5
sid_1,success,2030-12-01 10:10:04.888,1
window:[2030-12-01 10:10:00--2030-12-01 10:10:05]-<sid_1>--总时长:6
sid_1,success,2030-12-01 10:10:07,1
sid_1,success,2030-12-01 10:10:04.989,1
sid_1,success,2030-12-01 10:10:10,1
window:[2030-12-01 10:10:05--2030-12-01 10:10:10]-<sid_1>--总时长:2
代码:
xxxxxxxxxx
package com.itbaizhan.flink.scala.time
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.time.Duration
object EventTimeWindowAllowedLatenessDemo {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//使用node3:8888传递过来的数据
val lineDS: DataStream[String] = env.socketTextStream("node3", 8888)
//过滤空行
val stationDS: DataStream[Station] = lineDS.filter(_.trim.length > 0)
//类型转换
.map(line => {
val array = line.trim.split(",")
Station(array(0), array(1), format.parse(array(2)).getTime, array(3).toLong)
})
stationDS.filter(_.callType.equals("success"))
.assignTimestampsAndWatermarks(
WatermarkStrategy
//不考虑数据乱序
.forBoundedOutOfOrderness(Duration.ofSeconds(0))
//从日志数据对象中,提取事件时间列
.withTimestampAssigner(new SerializableTimestampAssigner[Station] {
//element:流中一条数据对象
override def extractTimestamp(element: Station, recordTimestamp: Long): Long = {
//返回值必须是Long行
element.callTime
}
})
).keyBy(_.sid)//按照基站id分组
//设置窗口:事件时间窗口,并且是滚动窗口,windowSize = 5s
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//TODO 设置数据允许延迟的最大等待时间为2秒:当窗口被触发计算以后,在等待的时间范围内,上次计算的窗口内容的数据再次到达,再次触发窗口计算
.allowedLateness(Time.seconds(2))
//IN, OUT, KEY, W <: Window]
.apply(new WindowFunction[Station,String,String,TimeWindow] {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def apply(key: String, window: TimeWindow, input: Iterable[Station], out: Collector[String]): Unit = {
//获取窗体的开始结束时间
val start = this.format.format(window.getStart)
val end = this.format.format(window.getEnd)
//对窗口中通话时长求和
var sum:Long = 0L
//遍历求和
for(ele<-input.iterator){
sum += ele.duration
}
out.collect("window:["+start+"--"+end+"]-<"+key+">--总时长:"+sum)
}
}).print()
env.execute("EventTimeWindowAllowedLatenessDemo")
}
}