大数据全系列 教程
1869个小节阅读:467.3k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Apache Flink 1.12.0
正式发布,流批一体
真正统一运行!在 DataStream API 上添加了高效的批执行模式的支持。批处理和流处理实现真正统一的运行时的一个重要里程碑。
参考Flink1.14官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/
修改流计算词频统计,从本地系统文本文件加载数据,处理数据,设置执行模式为:Batch
。
Java版:
xxxxxxxxxx
package com.itbaizhan.flink.java.base;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
//流批一体的方式:使用StreamExecutionEnvironment.getExecutionEnvironment()构建执行环境对象
public class WordCountStreamBatch {
public static void main(String[] args) throws Exception {
//1.构建执行环境-env 构建执行环境对象改变
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度
env.setParallelism(1);
//设置为批处理执行
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.读取本地文件返回数据源-streamSource
DataStreamSource<String> streamSource = env.readTextFile("datas/words");
//3.数据转换-transformation
//3.1分割单词
SingleOutputStreamOperator<String> words = streamSource.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String line, Collector<String> collector) throws Exception {
//按照空格进行拆分 两个单词之间如果有多个空格的话,会出现空格被认定为单词
//String[] words = line.trim().split(" ");//?
//优化为
String[] words = line.trim().split("\\s+");
//遍历数组,将单词依次添加到collector中
for (String word : words) {
collector.collect(word);
}
}
});
//3.2 将单词转化为二元组 hello -> (hello,1)
SingleOutputStreamOperator<Tuple2<String, Integer>> tupMap = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
//3.3 将相同的key分为一组 0表示二元组中第一个
KeyedStream<Tuple2<String, Integer>, String> tupGroup = tupMap.keyBy(tup -> tup.f0);
//3.4分组进行求和
SingleOutputStreamOperator<Tuple2<String, Integer>> resultSum = tupGroup.sum(1);
//4.数据接收器-sink
resultSum.print();
//5.触发执行-execute 流批一体操作是,即使批处理,也必须触发执行
env.execute("WordCountStreamBatch");
}
}
Scala版:
xxxxxxxxxx
package com.itbaizhan.flink.scala.base
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object WordCountStreamBatch {
def main(args: Array[String]): Unit = {
//导入隐式转换
import org.apache.flink.api.scala._
//1.构建执行环境-env
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//设置为批处理
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
//2.读取本地文件返回数据源-dataSource
val dataSource: DataStream[String] = env.readTextFile("datas/words")
//3.数据转换-transformation
dataSource.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.sum(1)
//4.输出
.print()
//5.触发执行
env.execute("WordCountStreamBatch")
}
}