大数据全系列 教程
1869个小节阅读:466.9k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:循环创建多个RDD,再将RDD放入队列,然后通过SparkStreaming创建DStream,最后计算WordCount。
测试过程中,可以通过使用ssc.queueStream(queueRdds)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
xxxxxxxxxx
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): InputDStream[T] = {
queueStream(queue, oneAtATime, sc.makeRDD(Seq.empty[T], 1))
}
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RddCreateDStream {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置信息
val conf = new SparkConf().
setMaster("local[*]")
.setAppName("RddCreateDStreamWC")
//2.初始化SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
//3.创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.获取InputDStream,oneAtATime 是否只应使用一个RDD
val inputDStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的RDD数据
val mapStream = inputDStream.map((_,1))
val redStream = mapStream.reduceByKey(_ + _)
//6.打印结果
redStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向RDD队列中放入RDD
for (i <- 1 to 6) {
rddQueue += ssc.sparkContext.makeRDD(0 to 9)
Thread.sleep(2000)
}
//9.等待执行停止
ssc.awaitTermination()
}
}
实时效果反馈
1. 关于queueStream函数的描述,错误的是:
def queueStream[T: ClassTag](queue: Queue[RDD[T]],oneAtATime: Boolean = true): InputDStream[T]= {......}
A queue: Queue[RDD[T]
Queue是scala.collection.mutable包下的。
B 参数oneAtATime
默认值为true,表示一批次应使用一个RDD。
C 参数oneAtATime
默认值为false,表示一批次不应使用一个RDD。
答案:
1=>C 默认值为true