大数据全系列 教程
1869个小节阅读:467.8k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
基于客户端歌曲播放表:“TO_CLIENT_SONG_PLAY_OPERATE_REQ_D”和歌曲基本信息日全量表:“TW_SONG_BASEINFO_D”生成歌曲特征日统计表:“TW_SONG_FTUR_D”,主要是按照两张表的歌曲ID进行关联,主要统计出歌曲在当天、7天、30天内的点唱信息和点赞信息。
创建类:com.itbaizhan.scala.musicproject.eds.content.GenerateTwSongFturD.scala
判断参数是否合法
xxxxxxxxxx
def main(args: Array[String]): Unit = {
if(args.length < 1) {
println(s"请输入数据日期,格式例如:年月日(20301011)")
System.exit(1)
}
//后续代码..
}
根据是否本地运行,构造SparkSession对象
xxxxxxxxxx
if(ConfigUtils.LOCAL_RUN){
sparkSession = SparkSession.builder().master("local")
.appName("Generate_TW_Song_Ftur_D")
.config("spark.sql.shuffle.partitions","1")
.config("hive.metastore.uris",ConfigUtils.HIVE_METASTORE_URIS)
.enableHiveSupport()
.getOrCreate()
sparkSession.sparkContext.setLogLevel("Error")
}else{
sparkSession = SparkSession.builder()
.appName("Generate_TW_Song_Ftur_D")
.enableHiveSupport().getOrCreate()
}
计算前7天、前30天的日期
xxxxxxxxxx
//输入数据的日期 ,格式 :年月日 yyyymmdd
val analyticDate = args(0)
//获取当前日期的前7天 日期
val per7Date = DateUtils.getCurrentDatePreDate(analyticDate,7)
//获取当前日期的前30天日期
val per30Date = DateUtils.getCurrentDatePreDate(analyticDate,30)
println(s"输入的日期为 :${analyticDate}")
println(s"per7Date = ${per7Date}")
println(s"per30Date = ${per30Date}")
切换Hive的命名空间
xxxxxxxxxx
sparkSession.sql(s"use ${ConfigUtils.HIVE_DATABASE}")
基于TO_CLIENT_SONG_PLAY_OPERATE_REQ_D
当前数据统计当天歌曲点唱情况
xxxxxxxxxx
sparkSession.sql(
s"""
| select
| songid as NBR, --歌曲ID
| count(*) as SING_CNT, --当日点唱量
| 0 as SUPP_CNT , --当日点赞量
| count(distinct uid) as USR_CNT, --当日点唱用户数
| count(distinct order_id) as ORDR_CNT --当日点唱订单数
| from TO_CLIENT_SONG_PLAY_OPERATE_REQ_D
| where data_dt = ${analyticDate}
| group by songid
""".stripMargin).createTempView("currentDayTable")
基于TO_CLIENT_SONG_PLAY_OPERATE_REQ_D
当前数据统计近7日歌曲点唱情况
xxxxxxxxxx
sparkSession.sql(
s"""
| select
| songid as NBR, --歌曲ID
| count(*) as RCT_7_SING_CNT, --近七天点唱量
| 0 as RCT_7_SUPP_CNT , --近七天点赞量
| count(distinct uid) as RCT_7_USR_CNT, --近七天点唱用户数
| count(distinct order_id) as RCT_7_ORDR_CNT --近七天点唱订单数
| from to_client_song_play_operate_req_d
| where ${per7Date}<= data_dt and data_dt <= ${analyticDate}
| group by songid
""".stripMargin).createTempView("pre7DayTable")
基于TO_CLIENT_SONG_PLAY_OPERATE_REQ_D
当前数据统计近30日歌曲点唱情况
xxxxxxxxxx
sparkSession.sql(
s"""
| select
| songid as NBR, --歌曲ID
| count(*) as RCT_30_SING_CNT, --近三十天点唱量
| 0 as RCT_30_SUPP_CNT , --近三十天点赞量
| count(distinct uid) as RCT_30_USR_CNT, --近三十天点唱用户数
| count(distinct order_id) as RCT_30_ORDR_CNT --近三十天点唱订单数
| from to_client_song_play_operate_req_d
| where ${per30Date}<= data_dt and data_dt <= ${analyticDate}
| group by songid
""".stripMargin).createTempView("pre30DayTable")
从TW_SONG_FTUR_D
表中获取过去7天和30天中 每首歌曲的 最高点唱量及点赞量
xxxxxxxxxx
sparkSession.sql(
s"""
| select
| NBR, --歌曲ID
| max(case when DATA_DT BETWEEN ${per7Date} and ${analyticDate} then SING_CNT else 0 end) as RCT_7_TOP_SING_CNT, --近七天最高日点唱量
| max(case when DATA_DT BETWEEN ${per7Date} and ${analyticDate} then SUPP_CNT else 0 end) as RCT_7_TOP_SUPP_CNT, --近七天最高日点赞量
| max(SING_CNT) as RCT_30_TOP_SING_CNT, --近三十天最高日点唱量
| max(SUPP_CNT) as RCT_30_TOP_SUPP_CNT --近三十天最高日点赞量
| from TW_SONG_FTUR_D
| where DATA_DT BETWEEN ${per30Date} and ${analyticDate}
| group by NBR
""".stripMargin).createTempView("pre7And30DayInfoTable")
然后在将以上4步得到的结果和TW_SONG_BASEINFO_D关联查询得到最终的结果
xxxxxxxxxx
sparkSession.sql(
s"""
|select
| A.NBR, --歌曲编号
| B.NAME, --歌曲名称
| B.SOURCE, --来源
| B.ALBUM, --所属专辑
| B.PRDCT, --发行公司
| B.LANG, --语言
| B.VIDEO_FORMAT, --视频风格
| B.DUR, --时长
| B.SINGER1, --歌手1
| B.SINGER2, --歌手2
| B.SINGER1ID, --歌手1ID
| B.SINGER2ID, --歌手2ID
| B.MAC_TIME, --加入机器时间
| A.SING_CNT, --当日点唱量
| A.SUPP_CNT, --当日点赞量
| A.USR_CNT, --当日点唱用户数
| A.ORDR_CNT, --当日点唱订单数
| nvl(C.RCT_7_SING_CNT,0) as RCT_7_SING_CNT, --近7天点唱量
| nvl(C.RCT_7_SUPP_CNT,0) as RCT_7_SUPP_CNT, --近7天点赞量
| nvl(E.RCT_7_TOP_SING_CNT,0) as RCT_7_TOP_SING_CNT, --近7天最高点唱量
| nvl(E.RCT_7_TOP_SUPP_CNT,0) as RCT_7_TOP_SUPP_CNT, --近7天最高点赞量
| nvl(C.RCT_7_USR_CNT,0) as RCT_7_USR_CNT, --近7天点唱用户数
| nvl(C.RCT_7_ORDR_CNT,0) as RCT_7_ORDR_CNT, --近7天点唱订单数
| nvl(D.RCT_30_SING_CNT,0) as RCT_30_SING_CNT, --近30天点唱量
| nvl(D.RCT_30_SUPP_CNT,0) as RCT_30_SUPP_CNT, --近30天点赞量
| nvl(E.RCT_30_TOP_SING_CNT,0) as RCT_30_TOP_SING_CNT, --近30天最高点唱量
| nvl(E.RCT_30_TOP_SUPP_CNT,0) as RCT_30_TOP_SUPP_CNT, --近30天最高点赞量
| nvl(D.RCT_30_USR_CNT,0) as RCT_30_USR_CNT, --近30天点唱用户数
| nvl(D.RCT_30_ORDR_CNT,0) as RCT_30_ORDR_CNT --近30天点唱订单数
|from
| currentDayTable A
| JOIN TW_SONG_BASEINFO_D B ON A.NBR = B.NBR
| LEFT JOIN pre7DayTable C ON A.NBR = C.NBR
| LEFT JOIN pre30DayTable D ON A.NBR = D.NBR
| LEFT JOIN pre7And30DayInfoTable E ON A.NBR = E.NBR
""".stripMargin).createTempView("result")
将结果写入到表TW_SONG_FTUR_D
中
xxxxxxxxxx
sparkSession.sql(
s"""
| insert overwrite table tw_song_ftur_d partition(data_dt=${analyticDate}) select * from result
""".stripMargin)
println("**** all finished ****")