大数据全系列 教程
1869个小节阅读:464.9k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
UDAF:用户自定义聚合函数。强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0版本以前通过继承UserDefinedAggregateFunction来实现用户自定义弱类型聚合函数。从Spark3.0版本(含该版本)后,它已经不推荐使用了,可以统一采用强类型聚合函数Aggregator。
UDAF原理图
xxxxxxxxxx
package com.itbaizhan.sql.deffun
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**SparkSQL UDAF:User Defined Aggregate Function
* 用户自定义的聚合函数
* 1.自定义类需要继承UserDefinedAggregateFunction类,并重写抽象方法
* 2.明白UDAF的执行原理
* 3.掌握如何重写这些抽象方法
*/
object UserDefUDAFOld {
def main(args: Array[String]): Unit = {
//1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("udf")
.getOrCreate()
//2.创建List数据
val nameList: List[String] = List[String](
"zhangsan", "lisi", "zhangsan", "wangwu", "zhangsan", "lisi","zhangsan")
//4.添加隐式转换
import spark.implicits._
//5.将List对象转换为DataFrame对象
val frame: DataFrame = nameList.toDF("name")
//6.注册临时视图
frame.createOrReplaceTempView("mytable")
//7.注册udaf函数
spark.udf.register("my_count",new MyCount())
//8.调用自定义的函数进行查询操作
//提示:与聚合函数同时出现在select子句中的字段,一定是在group by后面出现过的字段
spark.sql(
"""
|select name,my_count(name)
|from mytable
|group by name
|""".stripMargin).show()
//3.关闭
spark.close()
}
}
//9.自定义UDAF函数类,继承UserDefinedAggregateFunction类
class MyCount extends UserDefinedAggregateFunction{
//输出数据的类型结构
override def inputSchema: StructType
= StructType(List[StructField](
StructField("name",StringType,true)
))
//在聚合过程中处理的数据类型
override def bufferSchema: StructType
= StructType(List[StructField](
StructField("name",IntegerType,true)
))
//最终返回值的类型,和evaluate返回值的类型要一致
override def dataType: DataType = IntegerType
//如果此函数是确定性的,即给定相同的输入,则始终返回相同的输出,则返回true。
override def deterministic: Boolean = true
//作用在map和reduce两侧,为每个分区内的每个分组的数据赋初始化的值0
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
//作用在map端的每个分区内每个分组数据上进行聚合操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit
= buffer.update(0,buffer.getInt(0)+1)
//作用在reduce端,将相同key的value值进行聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
//聚合之后,每组数据最终结算的结果返回值,类型要和dataType一致
override def evaluate(buffer: Row): Any = buffer.getInt(0)
}
输出结果:
xxxxxxxxxx
+--------+--------------+
| name|my_count(name)|
+--------+--------------+
|zhangsan| 4|
| lisi| 2|
| wangwu| 1|
+--------+--------------+