大数据全系列 教程
1869个小节阅读:467k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
富函数接口它与其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态,并拥有一些生命周期方法(典型的生命周期方法有:open
和close
方法),所以可以实现更复杂的功能。富函数的接口有:
涉及到的方法:
open()
方法:初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。close()
方法:生命周期中的最后一个调用的方法,做一些清理工作。getRuntimeContext()
方法:提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态。案例演示:读取datas/station_log文件为数据源,通过关联flink_db.tb_phone表将通过记录数据中的主叫和被叫的电话号码改为具体的姓名。
如:StationLog(sid_3,姓名1,姓名2,success,1671693081272,12)
xxxxxxxxxx
package com.itbaizhan.flink.scala.transformation
import com.itbaizhan.flink.scala.source.StationLog
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.sql.{Connection, DriverManager, PreparedStatement}
object RichFunctionClassTransDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//添加隐式转
import org.apache.flink.streaming.api.scala._
//读取本地文件,作为数据源
val fileData: DataStream[String] = env.readTextFile("datas/station_log")
//类型转换
val stationLogDs: DataStream[StationLog] = fileData.map(ele => {
val arr = ele.split(",")
new StationLog(arr(0), arr(1), arr(2), arr(3), arr(4).trim.toLong, arr(5).toLong)
})
stationLogDs//.filter(_.callType.equals("success"))
.map(new CallRichMapFunction())
.print()
env.execute("RichFunctionClassTransDemo")
}
}
//自定义富函数类
class CallRichMapFunction() extends RichMapFunction[StationLog,StationLog]{
private var conn:Connection = _
private var pstat:PreparedStatement = _
//初始化方法中获取数据库连接,和预编译sql语句
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://node1:3306/flink_db","root","123456")
pstat = conn.prepareStatement("select name from tb_phone where phone_number = ?")
}
//每一条通话记录调用一次该方法,在该方法中将主叫和被叫的手机号改为对应的姓名
override def map(sl: StationLog): StationLog = {
//一.查询出主叫的姓名,并修改sl.callOut
//1.1为占位符赋值
pstat.setString(1,sl.callOut)
//1.2查询
val rs1 = pstat.executeQuery()
if(rs1.next()){
//1.3修改
sl.callOut = rs1.getString(1)
}
//二.查询出并叫的姓名,并修改sl.callIn
pstat.setString(1,sl.callIn)
val rs2 = pstat.executeQuery()
if(rs2.next()){
sl.callIn = rs2.getString(1)
}
sl//返回
}
//关闭连接,释放资源
override def close(): Unit = {
if(pstat!=null){
pstat.close()
}
if(conn!=null){
conn.close()
}
}
}