大数据全系列 教程
1869个小节阅读:467.9k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
假设数据流批处理间隔(batchInterval)为5s ,上图中窗口长度(window length )为15s,窗口滑动间隔(sliding interval)10s。窗口长度和滑动间隔必须是batchInterval的整数倍,如果不是整数倍会检测报错。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
: 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
xxxxxxxxxx
//reduceFunc–结合和交换reduce函数
//windowDuration–窗口长度;必须是此数据流批处理间隔的倍数
//slideDuration–窗口的滑动间隔,即新数据流生成RDD的间隔
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = ssc.withScope {
//partitioner–用于控制新数据流中每个RDD分区的分区器
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
代码编写:
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
object StreamingWindowFun1 {
def main(args: Array[String]): Unit = {
//1.初始化SparkConf类的对象
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamingWordCount")
//2.创建StreamingContext对象 数据批处理的间隔时间为5S
val ssc = new StreamingContext(conf, Seconds(5))
//3.通过监控node1的9999端口创建DStream对象
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("node1", 9999)
//4.将每一行数据做切分,形成一个个单词
val wordsDS: DStream[String] = lines.flatMap(_.split(" "))
//5.word=>(word,1)
val wordOne: DStream[(String, Int)] = wordsDS.map((_, 1))
//6.将相同的key的value做聚合加
//窗口的长度和滑动窗口间隔必须是批处理的间隔时间整数倍
val wordCount: DStream[(String, Int)] = wordOne.reduceByKeyAndWindow(
(v1:Int,v2:Int)=>{v1+v2},
Durations.seconds(15),//窗口的长度
Durations.seconds(10)//滑动窗口间隔
)
//7.打印输出
wordCount.print()
//8.启动
ssc.start()
//9.等待执行停止
ssc.awaitTermination()
}
}
测试
在node1上
xxxxxxxxxx
[root@node1 ~]# nc -lk 9999
在IDEA中运行程序
在node1上
xxxxxxxxxx
[root@node1 ~]# nc -lk 9999
a b c a
a b c d
a b c f
a b c e
a b c h