大数据全系列 教程
1869个小节阅读:467.1k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
原始数据data/demo/diff_row.csv:
xxxxxxxxxx
id,change,name
id1,1,a
id1,1,b
id1,1,c
id1,2,d
id1,2,e
id1,1,f
id2,2,g
id2,2,h
id2,1,i
id2,1,j
id2,2,k
id3,1,l
id3,1,m
id3,2,n
id3,3,o
id3,4,p
查询从哪一行后数据发生了变化,id和change两个值都分别相等则表示未变化,反之表示变化:
xxxxxxxxxx
id,change,name
id1,1,a
id1,1,b
id1,1,c
id1,2,d 变化了获取上一行
id1,2,e
id1,1,f 变化了获取上一行
id2,2,g
id2,2,h
id2,1,i 变化了获取上一行
id2,1,j
id2,2,k 变化了获取上一行
id3,1,l
id3,1,m
id3,2,n 变化了获取上一行
id3,3,o 变化了获取上一行
id3,4,p 变化了获取上一行
如何才能在id相同的情况下,让下一行和上一行数据比较id和change两列的值是否分别相同呢?
xxxxxxxxxx
id1,1,a id1,1,b
id1,1,b id1,1,c
id1,1,c id1,2,d
id1,2,d id1,2,e
id1,2,e id1,1,f
id1,1,f
id2,2,g id2,2,h
id2,2,h id2,1,i
id2,1,i id2,1,j
id2,1,j id2,2,k
id2,2,k
id3,1,l id3,1,m
id3,1,m id3,2,n
id3,2,n id3,3,o
id3,3,o id3,4,p
id3,4,p
首先我们发现原数据列是按照name列升序排列的,除了id的值外我们还需要为每一组相同的id值的数据按照name列进行排序(值同名次不同,序号不间断),使用row_number()
。
xxxxxxxxxx
select id,change,name,
row_number() over(partition by id order by name ) as rn
from temp
结果如下:
xxxxxxxxxx
+---+------+----+---+
| id|change|name| rn|
+---+------+----+---+
|id1| 1| a| 1|
|id1| 1| b| 2|
|id1| 1| c| 3|
|id1| 2| d| 4|
|id1| 2| e| 5|
|id1| 1| f| 6|
|id2| 2| g| 1|
|id2| 2| h| 2|
|id2| 1| i| 3|
|id2| 1| j| 4|
|id2| 2| k| 5|
|id3| 1| l| 1|
|id3| 1| m| 2|
|id3| 2| n| 3|
|id3| 3| o| 4|
|id3| 4| p| 5|
+---+------+----+---+
然后通过自关联进行查询。
xxxxxxxxxx
select a.id,a.change,a.name
from tb1 a join tb1 b on a.id = b.id
where a.change != b.change and a.rn = b.rn-1
代码编写:
xxxxxxxxxx
package com.itbaizhan.sql.examples
import org.apache.spark.sql.{DataFrame, SparkSession}
object FindChangeRow {
def main(args: Array[String]): Unit = {
//1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("OpenWindowFunction")
.getOrCreate()
//设置日志的级别
spark.sparkContext.setLogLevel("Error")
//3.读取csv文件
val df: DataFrame = spark.read.option("header", true)
.csv("data/demo/diff_row.csv")
//4.注册为临时视图
df.createTempView("temp")
//5.为每行数据添加一列row_number
spark.sql(
"""
|select id,change,name,
|row_number() over(partition by id order by name) as rn
|from temp
|""".stripMargin).createTempView("tb1")
//6.自身关联查询
spark.sql(
"""
|select a.id,a.change,a.name
|from tb1 a join tb1 b on a.id = b.id
|where a.change != b.change
|and a.rn = b.rn-1
|""".stripMargin).show()
//2.关闭
spark.close()
}
}
运行结果:
xxxxxxxxxx
+---+------+----+
| id|change|name|
+---+------+----+
|id1| 1| c|
|id1| 2| e|
|id2| 2| h|
|id2| 1| j|
|id3| 1| m|
|id3| 2| n|
|id3| 3| o|
+---+------+----+