大数据全系列 教程
1869个小节阅读:467k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingStopDemo {
def createSSC(): StreamingContext = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingStop")
//设置优雅的关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("./ckp")
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ckp", () => createSSC())
new Thread(new StreamingStop(ssc)).start()
val line: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)
line.print()
ssc.start()
ssc.awaitTermination()
}
}
xxxxxxxxxx
package com.itbaizhan.streaming
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
import java.net.URI
class StreamingStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI("hdfs://node2:9820"),
new Configuration(), "root")
while (true) {
try
Thread.sleep(5000)
catch {
case e: InterruptedException =>
e.printStackTrace()
}
val state: StreamingContextState = ssc.getState
if (state == StreamingContextState.ACTIVE) {
val bool: Boolean = fs.exists(new Path("hdfs://node2:9820/stopSpark"))
if (bool) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
测试
启动hadoop集群
在node1上:nc -lk 9999
运行程序
在node2:
xxxxxxxxxx
[root@node2 ~]# touch stopSpark
在node1上:
xxxxxxxxxx
[root@node1 ~]# nc -lk 9999
aa
bb
cc
dd
在node2:
xxxxxxxxxx
[root@node2 ~]# hdfs dfs -put stopSpark /
IDEA控制台看日志