大数据全系列 教程
1869个小节阅读:467.6k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求一:
数据的行列变化:多行数据到一行数据中,例如:
xxxxxxxxxx
+--------+----+-----+ +--------+---------+-----+
|username|item|price| |username|item |price|
+--------+----+-----+ +--------+---------+-----+
|zhangsan| A| 1| |zhangsan|A,B,C,D | 12|
|zhangsan| B| 2| |lisi |A,B,C | 16|
|zhangsan| C| 3| |wangwu | C | 8|
| lisi| A| 4| +--------+--------+-----+
| lisi| C| 5|
|zhangsan| D| 6|
| lisi| B| 7|
| wangwu| C| 8|
+--------+----+-----+
xxxxxxxxxx
package com.itbaizhan.sql.examples
import org.apache.spark.sql.{DataFrame, SparkSession}
object RowColumnTransfer1 {
def main(args: Array[String]): Unit = {
//1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("RowColumnTransfer1")
.getOrCreate()
//设置日志的级别
spark.sparkContext.setLogLevel("Error")
//3.读取csv文件
val df: DataFrame = spark.read.option("header", true)
.csv("data/demo/rowcolumn.csv")
//4.注册临时视图
df.createTempView("temp1")
//5.执行查询
spark.sql(
"""
|select username,concat_ws(",",collect_list(item)) item,sum(price) totalprice
|from temp1
|group by username
|""".stripMargin).show(false)
//6.扩展:行转列,暂时不考虑总价格拆分问题
spark.sql(
"""
|select username,concat_ws(",",collect_list(item)) cw,sum(price) totalprice
|from temp1
|group by username
|""".stripMargin).createTempView("temp2")
//|zhangsan|A,B,C,D | 12|
spark.sql(
"""
|select username,explode(split(cw,",")) item,totalprice
|from temp2
|""".stripMargin).show()
spark.close()
}
}
需求二:
xxxxxxxxxx
+--------+----+-----+
|username|item|price| +--------+----+----+---+----+
+--------+----+-----+ |username| A| B| C| D|
|zhangsan| A| 1| +--------+----+----+---+----+
|zhangsan| B| 2| | wangwu|null|null| 8|null|
|zhangsan| C| 3| |zhangsan| 1| 2| 3| 6|
| lisi| A| 4| | lisi| 4| 7| 5|null|
| lisi| C| 5| +--------+----+----+---+----+
|zhangsan| D| 6|
| lisi| B| 7|
| wangwu| C| 8|
+--------+----+-----+
str_to_map(字段,分隔符1,分隔符2) : 把当前字符串字段按照分隔符1切分成多条数据,再对每条数据按照分隔符2切割成K,V格式的数据组成Map
xxxxxxxxxx
> SELECT str_to_map('a:1,b:2,c:3', ',', ':');
{"a":"1","b":"2","c":"3"}
> SELECT str_to_map('a');
{"a":null}
map(K1,V1,K2,V2,K3,V3... ...) : 得到一个map集合
xxxxxxxxxx
> SELECT map(1.0, '2', 3.0, '4');
{1.0:"2",3.0:"4"}
xxxxxxxxxx
package com.itbaizhan.sql.examples
import org.apache.spark.sql.SparkSession
object RowColumnTransfer2 {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.appName("RowColumnTransfer2")
.master("local[*]")
.getOrCreate()
session.sparkContext.setLogLevel("Error")
val frame = session.read.option("header",true)
.csv("data/demo/rowcolumn.csv")
frame.createTempView("temp1")
//|username| cw |
// zhangsan|A,1#B,2#C,3#D,6|
session.sql(
"""
| select
| username,concat_ws("#",collect_list(concat(item,",",price))) as cw
| from temp1
| group by username
""".stripMargin).createTempView("temp2")
//|username| mp |
//|zhangsan|{A -> 1, B -> 2, C -> 3, D -> 6}|
session.sql(
"""
| select
| username,str_to_map(cw,"#",",") as mp
| from temp2
""".stripMargin).createTempView("temp3")
//|username| A| B| C| D|
//|zhangsan| 1| 2| 3| 6|
session.sql(
"""
| select username ,mp['A'] as A,mp['B'] as B ,mp['C'] as C ,mp['D'] as D
| from temp3
""".stripMargin).show()
//再转回去:
session.sql(
"""
| select username ,mp['A'] as A,mp['B'] as B ,mp['C'] as C ,mp['D'] as D
| from temp3
""".stripMargin).createTempView("temp4")
session.sql(
"""
| select username,item,price
| from
| (select
| username,explode(map("A",A,"B",B,"C",C,"D",D)) as (item,price)
| from temp4) temp5
| where price is not null
""".stripMargin).show(100,false)
}
}