大数据全系列 教程
1869个小节阅读:467.3k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
滑动计数窗口案例:每隔2条数据,统计在最近5条消息中, 数字之和sum值
xxxxxxxxxx
package com.itbaizhan.flink.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector
object SlidingCountWindowDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//通过socket读取node3:8888传递过来的数据
val stream: DataStream[String] = env.socketTextStream("node3", 8888)
//过滤非法数据
val etlDS: DataStream[Int] = stream.filter(_.trim.length > 0)
//数据类型转换
.map(_.toInt)
//TODO 1.基本累加操作 winSize=5,slide=2
//设置滑动窗体 winSize=5,slide=2
/*etlDS.countWindowAll(5,2)
//基本累加求和,并输出
.sum(0).print()*/
//TODO 2.累加操作后格式化 winSize=5,slide=2
//设置滑动窗体winSize=5,slide=2
etlDS.countWindowAll(5,2)
//设置窗口函数,对窗口中的数据求和 AllWindowFunction[IN, OUT, W <: Window]
.apply(new AllWindowFunction[Int,String,GlobalWindow] {
override def apply(window: GlobalWindow, input: Iterable[Int], out: Collector[String]): Unit = {
var sum:Int = 0
//遍历求和
for(ele<-input.iterator){
sum += ele
}
//输出
out.collect("最后窗口元素的和(winSize=5,slide=2):"+sum)
}
}).print()
env.execute("SlidingCountWindowDemo")
}
}