大数据全系列 教程
1869个小节阅读:464.8k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
案例需求:计算每个基站的呼叫间隔时间(只需考虑callTime差值即可),单位是毫秒。--使用ValueState
测试数据:
[root@node3 ~]# nc -lk 8888 sid_1,success,2030-12-01 10:10:01,1 sid_1,success,2030-12-01 10:10:02,1 sid_1,success,2030-12-01 10:10:04,1 sid_1,success,2030-12-01 10:10:07,1 sid_1,success,2030-12-01 10:10:11,1
测试结果:
第一次处理 sid_1,success,2030-12-01 10:10:01,1 基站sid_1 两次通话的间隔:1000 基站sid_1 两次通话的间隔:2000 基站sid_1 两次通话的间隔:3000 基站sid_1 两次通话的间隔:4000
案例代码:
xxxxxxxxxx
package com.itbaizhan.flink.scala.state
import com.itbaizhan.flink.scala.time.Station
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
object CallDiffDurationsKeyState {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
def main(args: Array[String]): Unit = {
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//1.构建执行环境-env
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局的并行度
env.setParallelism(1)
val linesDS = env.socketTextStream("node3", 8888)
//过滤空行
val stationDS:DataStream[Station] = linesDS.filter(_.trim.length > 0)
//类型转换
.map(line => {
val array = line.trim.split(",")
//sid:String,callType:String,callTime:Long,duration:Long
Station(array(0), array(1), format.parse(array(2)).getTime, array(3).toLong)
})
//按照基站id分组
val keyedStream: KeyedStream[Station, String] = stationDS.keyBy(_.sid)
//TODO 键控状态管理方式一:KeyedProcessFunction<K, I, O>
//<K> – Type of the key. sid的类型
//<I> – Type of the input elements. Station
//<O> – Type of the output elements.
/*keyedStream.process(new KeyedProcessFunction[String,Station,String] {
//为当前基站设置一个状态,状态中保存的是上一条数据的通话时间
private var timeState:ValueState[Long] = _
//初始化工作,每一组数据调用一次该方法
override def open(parameters: Configuration): Unit = {
//初始化状态
timeState = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timeState",classOf[Long]))
}
//当前组内的每一条数据都会调用一次该方法
override def processElement(station: Station, ctx: KeyedProcessFunction[String, Station, String]#Context, out: Collector[String]): Unit = {
//判断第一次获取状态的值,默认值0L
if(timeState.value()==0){
//没有状态,更新状态值
timeState.update(station.callTime)
out.collect(s"第一次处理${station}")
}else{
//获取当前这组key的上一条日志的通话时间
val preCallTime: Long = timeState.value()
//当前条日志的通话时间
val currentCallTime: Long = station.callTime
//更新状态值
timeState.update(currentCallTime)
out.collect(s"当前基站:${station.sid},两次通话的时间间隔为:"+(currentCallTime-preCallTime)+"毫秒")
}
}
}).printToErr()*/
/**TODO 键控状态管理实现方式二
* sl:Station,option:Option[Long]
* sl:进入数据对象
* option:Option[Long] 当前key对应上一次保存的状态值
*/
keyedStream.mapWithState((sl:Station,option:Option[Long])=>{
if(option.isEmpty){
//没有状态,更新状态值
(s"第一次处理${sl}",Option(sl.callTime))
}else{
//有状态,获取状态
val preCallTime = option.get
(s"当前基站:${sl.sid},两次通话的时间间隔为${sl.callTime-preCallTime}",Option(sl.callTime))
}
}).print()
//触发执行
env.execute("CallDiffDurationsKeyState")
}
}