大数据全系列 教程
1869个小节阅读:467.6k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:每隔5秒随机生成十条10条基站信息(基站ID、主叫号码、被叫号码、通话类型、呼叫时间戳、通话时长)
实现SourceFunction
接口或ParallelSourceFunction
接口
sid 基站ID callOut 主叫号码 callIn 被叫号码 callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success): callTime 呼叫时间戳,精确到毫秒 duration 通话时长 单位:秒
xxxxxxxxxx
package com.itbaizhan.flink.scala.source
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Random
/**需求:每隔5秒随机生成十条10条基站信息(基站ID、主叫号码、被叫号码、通话类型、呼叫时间戳、通话时长)
* 实现`SourceFunction`接口或`ParallelSourceFunction`接口
*/
//实现SourceFunction
class MyDefinedSource extends SourceFunction[StationLog]{
//是否继续生产数据
private var isMakeData = true
private val random = new Random()
//所有的通话类型:呼叫失败(fail),占线(busy),拒接(barring),接通(success)
private val callTypes: Array[String] = Array[String]("fail", "busy", "barring", "success")
override def run(sourceContext: SourceFunction.SourceContext[StationLog]): Unit = {
while(isMakeData){
//生成数据
1.to(10).map(i=>{
StationLog("sid_"+i,"1341234%04d".format(random.nextInt(8)),
"1731234%04d".format(random.nextInt(8)),
callTypes(random.nextInt(callTypes.length)),
System.currentTimeMillis(),1+random.nextInt(100)
)
}).foreach(ele=>{//给下一个任务算子发送数据
sourceContext.collect(ele)
})
Thread.sleep(5000)
}
}
/*override def cancel(): Unit = {
isMakeData = false
}*/
//省略写法,与上面注释掉的cancel方法等价
override def cancel(): Unit = isMakeData = false
}
//实现ParallelSourceFunction
class MyDefinedSource2 extends ParallelSourceFunction[StationLog]{
//是否继续生产数据
private var isMakeData = true
private val random = new Random()
//所有的通话类型:呼叫失败(fail),占线(busy),拒接(barring),接通(success)
private val callTypes: Array[String] = Array[String]("fail", "busy", "barring", "success")
override def run(sourceContext: SourceFunction.SourceContext[StationLog]): Unit = {
while(isMakeData){
//生成数据
1.to(10).map(i=>{
StationLog("sid_"+i,"1341234%04d".format(random.nextInt(8)),
"1731234%04d".format(random.nextInt(8)),
callTypes(random.nextInt(callTypes.length)),
System.currentTimeMillis(),1+random.nextInt(100)
)
}).foreach(ele=>{//给下一个任务算子发送数据
sourceContext.collect(ele)
})
Thread.sleep(5000)
}
}
/*override def cancel(): Unit = {
isMakeData = false
}*/
//省略写法,与上面注释掉的cancel方法等价
override def cancel(): Unit = isMakeData = false
}
object CustomSourceInterfaceDemo {
def main(args: Array[String]): Unit = {
//构建执行环境对象
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.setParallelism(3)
//添加隐式转
import org.apache.flink.streaming.api.scala._
/*val sourceFunctionDs: DataStream[StationLog] = env.addSource(new MyDefinedSource())
println("sourceFunctionDs并行度:"+sourceFunctionDs.parallelism)
//输出并行度1
sourceFunctionDs.print()*/
val parallelSourceFunctionDS = env.addSource(new MyDefinedSource2)
//并行度是当前机器的CPU核数20
println("parallelSourceFunctionDS并行度:"+parallelSourceFunctionDS.parallelism)
parallelSourceFunctionDS.print()
//触发执行
env.execute("CustomSourceInterfaceDemo")
}
}