大数据全系列 教程
1869个小节阅读:468k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
主要是读取Hive中的ODS层 TO_SONG_INFO_D
表生成 TW层TW_SONG_BASEINFO_D
表.
创建类com.itbaizhan.scala.musicproject.eds.content.GenerateTwSongBaseinfoD
定义函数getAlbumName
,实现专辑名称ALBUM
的获取
1.大部分专辑数据格式:[{"name":"《不许你注定一人》","id":"5c19b89450057d3b20232f7a"}] =>"不许你注定一人"
2.部分数据格式:《LANDIVA》=>"LANDIVA"
3.部分数据格式:[]、空 => "暂无专辑"
xxxxxxxxxx
//定义函数`getAlbumName`,实现专辑名称`ALBUM`的获取
val getAlbumName: String => String = (albumInfo:String)=>{
//定义专辑名称的变量
var albumName = ""
try{
//[{"name":"《不许你注定一人》","id":"5c19b89450057d3b20232f7a"}] =>"不许你注定一人"
//将字符串转化为json数组
val jsonArray = JSON.parseArray(albumInfo)
//从数组中的第一个json对象中获取key为name的值
albumName = jsonArray.getJSONObject(0).getString("name")
}catch {
case e:Exception=>{
if(albumInfo.contains("《")&&albumInfo.contains("》")){
//部分数据格式:《LANDIVA》=>"LANDIVA"
albumName = albumInfo.substring(albumInfo.indexOf('《'),albumInfo.indexOf('》')+1)
}else{
//部分数据格式:[]、空 => "暂无专辑"
albumName = "暂无专辑"
}
}
}
//返回专辑名称
albumName
}
定义函数getPostTime
,实现发行时间POST_TIME
的获取
调用DateUtils.formatDate(stringDate:String):String即可
xxxxxxxxxx
val getPostTime : String => String = (postTime:String) =>{
DateUtils.formatDate(postTime)
}
定义函数getSingerInfo
,实现歌手名称或歌手id的获取
1.[{"name":"李维真","id":"18013"},{"name":"谢丹","id":"15293"}]
2.[{"name":"方大同","id":"6899"}]
3.部分数据格式:[]、空 => ""
xxxxxxxxxx
//定义函数`getSingerInfo`,实现歌手名称或歌手id的获取
//第一个参数为歌手信息:[{"name":"苏打绿","id":"15828"}]
//第二个参数:singer1或singer2
//第三个参数:name或id
//返回值是歌手的名称或id
val getSingerInfo:(String,String,String) =>String = (singerInfo:String,singer:String,nameOrId:String) =>{
var singerNameOrSingerId = ""
//1.[{"name":"李维真","id":"18013"},{"name":"谢丹","id":"15293"}]
//2.[{"name":"方大同","id":"6899"}]
//3.部分数据格式:[]、空 => ""
try{
//将字符串转化json数组
val jsonArray = JSON.parseArray(singerInfo)
if("singer1".equals(singer)&&"name".equals(nameOrId)&&jsonArray.size()>0){
singerNameOrSingerId = jsonArray.getJSONObject(0).getString("name")
}else if("singer1".equals(singer)&&"id".equals(nameOrId)&&jsonArray.size()>0){
singerNameOrSingerId = jsonArray.getJSONObject(0).getString("id")
}else if("singer2".equals(singer)&&"name".equals(nameOrId)&&jsonArray.size()>1){
singerNameOrSingerId = jsonArray.getJSONObject(1).getString("name")
}else if("singer2".equals(singer)&&"id".equals(nameOrId)&&jsonArray.size()>1){
singerNameOrSingerId = jsonArray.getJSONObject(1).getString("id")
}
}catch{
case e:Exception=>{
singerNameOrSingerId
}
}
singerNameOrSingerId
}
定义函数getAuthCompany
,实现授权公司AUTH_CO
的获取
1.{"name":"乐心曲库","id":"5aa740ecb19efe0b504ce30a"}=>"乐心曲库"
2.部分数据格式:空 => "乐心曲库"
xxxxxxxxxx
//定义函数`getAuthCompany`,实现授权公司`AUTH_CO`的获取
val getAuthCompany:String=>String = (authCompanyInfo:String)=>{
//1.{"name":"乐心曲库","id":"5aa740ecb19efe0b504ce30a"}=>"乐心曲库"
//2.部分数据格式:空 => "乐心曲库"
var authCompanyName = "乐心曲库"
try{
//将json字符串转化为json对象
val jsonObj = JSON.parseObject(authCompanyInfo)
//从json对象中获取name对应的value
authCompanyName = jsonObj.getString("name")
}catch {
case e:Exception=>{
authCompanyName
}
}
authCompanyName
}
定义函数getPrdctType
,实现产品类型PRDCT_TYPE
的获取
[8.0,2.0,9]=>ListBuffer[Int]()
xxxxxxxxxx
//定义函数`getPrdctType`,实现产品类型`PRDCT_TYPE`的获取
val getPrdctType:(String=>ListBuffer[Int]) = (productTypeInfo:String)=>{
//[8.0,2.0,9]=>ListBuffer[Int]()
val listBuf = new ListBuffer[Int]()
//不为空时
if(!"".equals(productTypeInfo.trim)){
//去掉中括号[],再按照逗号拆分
val strings: Array[String] = productTypeInfo.stripPrefix("[").stripSuffix("]").split(",")
strings.foreach(ele=>{
listBuf.append(ele.toDouble.toInt)
})
}
listBuf
}
在当前类的main方法中根据是否本地运行构建SparkSession对象
xxxxxxxxxx
def main(args: Array[String]): Unit = {
//在当前类的main方法中根据是否本地运行构建SparkSession对象
if(ConfigUtils.LOCAL_RUN){//本地运行
sparkSession = SparkSession.builder()
.master("local")
.config("hive.metastore.uris",ConfigUtils.HIVE_METASTORE_URIS)
.enableHiveSupport().getOrCreate()
}else{//集群运行
sparkSession = SparkSession.builder()
.enableHiveSupport().getOrCreate()
}
//后续代码...
}
导入函数,可以使用 udf、col 方法
xxxxxxxxxx
import org.apache.spark.sql.functions._
构建转化数据的udf
xxxxxxxxxx
//构建转化数据的udf
val udfGetAlbumName : UserDefinedFunction = udf(getAlbumName)
val udfGetPostTime : UserDefinedFunction = udf(getPostTime)
val udfGetSingerInfo : UserDefinedFunction = udf(getSingerInfo)
val udfGetAuthCompany : UserDefinedFunction = udf(getAuthCompany)
val udfGetPrdctType : UserDefinedFunction = udf(getPrdctType)
切换Hive的命名空间
xxxxxxxxxx
sparkSession.sql(s"use ${ConfigUtils.HIVE_DATABASE}")
读取Hive表TO_SONG_INFO_D
,使用如上定义的UDF函数新增或替换对应的字段,然后注册为临时视图TEMP_TO_SONG_INFO_D
xxxxxxxxxx
sparkSession.table("TO_SONG_INFO_D")
//使用如上定义的UDF函数替换对应的字段
.withColumn("ALBUM",udfGetAlbumName(col("ALBUM")))
.withColumn("POST_TIME",udfGetPostTime(col("POST_TIME")))
.withColumn("AUTH_CO",udfGetPostTime(col("AUTH_CO")))
.withColumn("PRDCT_TYPE",udfGetPostTime(col("PRDCT_TYPE")))
//使用如上定义的UDF函数新增对应的字段
.withColumn("SINGER1",udfGetSingerInfo(col("SINGER_INFO"),lit("singer1"),lit("name")))
.withColumn("SINGER1ID",udfGetSingerInfo(col("SINGER_INFO"),lit("singer1"),lit("id")))
.withColumn("SINGER2",udfGetSingerInfo(col("SINGER_INFO"),lit("singer2"),lit("name")))
.withColumn("SINGER2ID",udfGetSingerInfo(col("SINGER_INFO"),lit("singer2"),lit("id")))
//然后注册为临时视图`TEMP_TO_SONG_INFO_D`
.createTempView("TEMP_TO_SONG_INFO_D")
清洗数据,将结果保存到 Hive的TW_SONG_BASEINFO_D
表中
xxxxxxxxxx
sparkSession.sql(
"""
| select NBR,
| nvl(NAME,OTHER_NAME) as NAME,
| SOURCE,
| ALBUM,
| PRDCT,
| LANG,
| VIDEO_FORMAT,
| DUR,
| SINGER1,
| SINGER2,
| SINGER1ID,
| SINGER2ID,
| 0 as MAC_TIME,
| POST_TIME,
| PINYIN_FST,
| PINYIN,
| SING_TYPE,
| ORI_SINGER,
| LYRICIST,
| COMPOSER,
| BPM_VAL,
| STAR_LEVEL,
| VIDEO_QLTY,
| VIDEO_MK,
| VIDEO_FTUR,
| LYRIC_FTUR,
| IMG_QLTY,
| SUBTITLES_TYPE,
| AUDIO_FMT,
| ORI_SOUND_QLTY,
| ORI_TRK,
| ORI_TRK_VOL,
| ACC_VER,
| ACC_QLTY,
| ACC_TRK_VOL,
| ACC_TRK,
| WIDTH,
| HEIGHT,
| VIDEO_RSVL,
| SONG_VER,
| AUTH_CO,
| STATE,
| case when size(PRDCT_TYPE) =0 then NULL else PRDCT_TYPE end as PRDCT_TYPE
| from TEMP_TO_SONG_INFO_D
| where NBR != ''
""".stripMargin).write.format("Hive").mode(SaveMode.Overwrite).saveAsTable("TW_SONG_BASEINFO_D")
println("**** all finished ****")