大数据全系列 教程
1869个小节阅读:468k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,需要先进行分区,然后再做聚合。
keyBy [DataStream->KeyedStream]
在Flink中如果是批处理,分组使用函数:groupBy
,从Flink 1.12以后开始,由于流批一体,无论是流计算还是批处理,分组函数:keyBy
。
在使用keyBy函数时,可以指定下标索引
(数据类型为元组)、指定属性名称
(数据类型为样例类)。
keyBy算子表示:按照指定的key来对流中的数据进行分组,分组后流称为KeyedStream
,要么聚合操作(调用reduce、fold或aggregate函数等等),要么进行窗口操作window。
聚合算子
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以_1、_2、_3、…
来命名的。
xxxxxxxxxx
package com.itbaizhan.flink.scala.transformation
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object TransformationAggregationDemo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.api.scala._
val ds: DataStream[(String, String, Int)] = env.fromElements(("a", "a2", 1), ("a", "a1", 2), ("b", "b2", 4), ("b", "b1", 3), ("c", "c1", 5))
//按照二元组的第一个元素分组
val keyedStream: KeyedStream[(String, String, Int), String] = ds.keyBy(_._1)
//输出
//keyedStream.print()
//对元组的索引 2 位置(也就是第三个位置)求最大值 (a,a2,2)
//keyedStream.max(2).print()
//keyedStream.max("_3").print()
//对元组的索引 2 位置(也就是第三个位置)求最大值 (a,a1,2)
//keyedStream.maxBy("_3").print()
//对元组的索引 2 位置(也就是第三个位置)求最小值 (b,b2,3)
//keyedStream.min(2).print()
//keyedStream.min("_3").print()
//对元组的索引 2 位置(也就是第三个位置)求最大值 (b,b1,3)
keyedStream.minBy("_3").print()
//对元组的索引 2 位置数据求和
//keyedStream.sum(2).print()
//对元组的第 3 个位置数据求和。等价于sum(2)
//keyedStream.sum("_3").print()
//reduce 算子,仅仅针对DataStream被keyBy分组后KeyedStream数据进行聚合
/*keyedStream.reduce((x:(String, String, Int),y:(String, String, Int))=>{
(x._1,x._2,x._3+y._3)
}).print()*/
env.execute("TransformationAggregationDemo")
}
}