大数据全系列 教程
1869个小节阅读:467.8k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例需求:
1.每隔5秒统计最近5秒钟每个基站的日志数量(滚动窗口)
2.每隔5秒统计最近10秒钟每个基站的日志数量(滑动窗口)
3.每隔5秒统计最近5秒钟每个基站的日志数量,显示窗口的开始结束时间(滚动窗口)
编码实现:
xpackage com.itbaizhan.flink.scala.window
import com.itbaizhan.flink.scala.source.{MyDefinedSource, StationLog}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TimeWindowDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//调用自定义数据源
val stationLogDS: DataStream[StationLog] = env.addSource(new MyDefinedSource)
//TODO 1.每隔5秒统计最近5秒钟每个基站的日志数量(滚动窗口)
/*stationLogDS.map(sl=>(sl.sid,1))
.keyBy(_._1)//分组
//设置滚动窗口,窗口的size = 5s
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)//聚合
.print()*/
//TODO 2.每隔5秒统计最近10秒钟每个基站的日志数量(滑动窗口)
/*stationLogDS.map(sl=>(sl.sid,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.sum(1)
.print()*/
//TODO 3.每隔5秒统计最近5秒钟每个基站的日志数量,显示窗口的开始结束时间(滚动窗口)
stationLogDS.map(sl=>(sl.sid,1))
.keyBy(_._1)
//设置滚动窗口,窗口的size = 5s
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//扩展:每隔5秒统计最近10秒钟每个基站的日志数量,显示窗口的开始结束时间(滑动动窗口)
//.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
/**WindowFunction[IN, OUT, KEY, W <: Window]
*IN – The type of the input value.进入窗口中数据的类型
OUT – The type of the output value.计算后输出的数据的类型
KEY – The type of the key. 分组key的数据类型
TimeWindow- 时间窗口
*/
.apply(new WindowFunction[(String,Int),String,String,TimeWindow] {
//定义时间转化类
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
/**实现抽象的apply方法,每个窗口结束时每组数据都会调用一次该方法
* @param key:当前组的key
* @param window:当前窗口对象,封装了窗口的开始结束时间
* @param input:当前窗口下当前组的所有数据
* @param out :通过该对象输出处理后的结果
*/
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
//获取窗口的开始时间
val startTime:String = this.format.format(window.getStart)
//获取窗口的结束时间
val endTIme:String = this.format.format(window.getEnd)
//定义变量sum
var sum = 0
//遍历求和
for(tup<-input.iterator){
sum += tup._2
}
//拼接输出结果
val result = "window["+startTime+"--"+endTIme+"],key:<"+key+">总次数:"+sum
//输出->sink
out.collect(result)
}
}).print()
env.execute("TimeWindowDemo")
}
}