大数据全系列 教程
1869个小节阅读:468k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
通信基站日志数据样例类
xxxxxxxxxx
package com.itbaizhan.flink.scala.source
/**通信基站日志数据
* @param sid 基站ID
* @param callOut 主叫号码
* @param callIn 被叫号码
* @param callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success):
* @param callTime 呼叫时间戳,精确到毫秒
* @Param duration 通话时长 单位:秒
*/
case class StationLog(sid:String,callOut:String,callIn:String,callType:String,callTime:Long,duration:Long)
CollectionSourceDemo:
xxxxxxxxxx
package com.itbaizhan.flink.scala.source
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object CollectionSourceDemo {
def main(args: Array[String]): Unit = {
//构建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(1)
//导入隐式转换
import org.apache.flink.api.scala._
//fromElements演示
val eleDs: DataStream[String] = env.fromElements("hive", "scala", "spark", "flink")
//eleDs.print()
val eleIntDs: DataStream[Int] = env.fromElements(1, 2, 3, 4)
//eleIntDs.print()
//fromSequence(from: Long, to: Long): [1,5]
val numDs: DataStream[Long] = env.fromSequence(1, 5)
//numDs.print()
//集合对象
val collCaseObjDS: DataStream[StationLog] = env.fromCollection(Array[StationLog](
StationLog("1","18612345670","17312345679","success",System.currentTimeMillis(),86L),
StationLog("2","18612345671","17312345678","success",System.currentTimeMillis(),86L),
StationLog("3","18612345672","17312345677","busy",System.currentTimeMillis(),86L),
StationLog("4","18612345673","17312345676","success",System.currentTimeMillis(),86L),
StationLog("5","18612345674","17312345675","busy",System.currentTimeMillis(),86L),
))
//collCaseObjDS.print()
//传递过来的是字符串,转化为对象
val collStringDS: DataStream[String] = env.fromCollection(Array[String](
"01,18612345670,17312345679,success,"+System.currentTimeMillis()+",96",
"02,18612345671,17312345678,success,"+System.currentTimeMillis()+",95",
"03,18612345672,17312345677,success,"+System.currentTimeMillis()+",94",
"04,18612345673,17312345676,success,"+System.currentTimeMillis()+",93",
"05,18612345674,17312345675,success,"+System.currentTimeMillis()+",92"
))
val resultDs: DataStream[StationLog] = collStringDS.map(ele => {
val arr: Array[String] = ele.split(",")
StationLog(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5).toLong)
})
resultDs.print()
//触发执行
env.execute("CollectionSourceDemo")
}
}