大数据全系列 教程
1869个小节阅读:467.7k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:自定义数据源,实现监控指定的端口号,获取该端口号内容。
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
class ReceiverCustomer(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
//最初启动的时候,调用该方法
//作用:读数据并将数据发送给Spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
override def onStop(): Unit = {}
//读数据并将数据发送给Spark
def receive(): Unit = {
//创建一个Socket
var socket: Socket = new Socket(host, port)
//定义一个变量,用来接收端口传过来的数据
var input: String = null
//创建一个BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
//读取数据
input = reader.readLine()
//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
}
//跳出循环则关闭资源
reader.close()
socket.close()
//重启任务
restart("restart")
}
}
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object CustomerSource {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置信息
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("CustomerSource")
//2.初始化
val ssc = new StreamingContext(sparkConf, Seconds(5))
//3.创建自定义receiver的Streaming
val lines = ssc.receiverStream(new ReceiverCustomer("node1", 9999))
lines.print()
//4.启动
ssc.start()
ssc.awaitTermination()
}
}
测试
在node1上
xxxxxxxxxx
[root@node1 ~]# nc -lk 9999
在IDEA中运行程序
在node1上
xxxxxxxxxx
[root@node1 ~]# nc -lk 9999
aa
bb
cc
查看IDEA控制台
实时效果反馈
1. 关于SparkStreaming接收器自定义数据源的描述,错误的是:
A 需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
B Xxx extends Receiver[String](StorageLevel.MEMORY_ONLY)
接收到数据仅保存在内存中。
C onStart()
最初启动的时候,调用该方法;作用是读数据并将数据发给Spark。
D onStop()
不能空实现。
答案:
1=>D 可以空实现