大数据全系列 教程
1869个小节阅读:465.4k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Flink DataStream API中最底层API,提供process
方法,其中需要实现ProcessFunction
函数类,类似的类还有KeyedProcessFunction
。
ProcessFunction是一个低层次的流处理操作,允许返回所有Stream的基础构建模块:
总而言之,ProcessFunction是Flink最底层的API,也是功能最强大的。
案例:使用process
函数,代替filter
函数,实现对数据过滤操作。
xxxxxxxxxx
package com.itbaizhan.flink.scala.transformation
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object ProcessFunctionDemo {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming流计算的上下文对象
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局并行度
env.setParallelism(1)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取node1通过socket传输过来的数据
val stream: DataStream[String] = env.socketTextStream("node1",8888)
/*stream.filter(new FilterFunction[String] {
override def filter(value: String): Boolean = {
value.trim.length>0
}
}).print()*/
//stream.filter(_.trim.length>0).print()
stream.process(new ProcessFunction[String,String](){
override def processElement(line: String, context: ProcessFunction[String, String]#Context, collector: Collector[String]): Unit = {
if(line.trim.length>0){
collector.collect(line)
}
}
}).print()
env.execute("ProcessFunctionDemo")
}
}