大数据全系列 教程
1869个小节阅读:465.7k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
创建类:com.itbaizhan.scala.musicproject.dm.content.GenerateTmSingerRsiD.scala
判断参数是否合法
xxxxxxxxxx
def main(args: Array[String]): Unit = {
if(args.length < 1) {
println(s"请输入数据日期,格式例如:年月日(20301011)")
System.exit(1)
}
//后续代码..
}
根据是否本地运行,构造SparkSession对象
xxxxxxxxxx
if(ConfigUtils.LOCAL_RUN){
sparkSession = SparkSession.builder().master("local")
.appName("Generate_TM_Singer_Rsi_D")
.config("spark.sql.shuffle.partitions","1")
.config("hive.metastore.uris",ConfigUtils.HIVE_METASTORE_URIS)
.enableHiveSupport()
.getOrCreate()
sparkSession.sparkContext.setLogLevel("Error")
}else{
sparkSession = SparkSession.builder()
.appName("Generate_TM_Singer_Rsi_D")
.enableHiveSupport().getOrCreate()
}
定义常量,接收日期
xxxxxxxxxx
//输入数据的日期 ,格式 :年月日 yyyymmdd
val currentDate = args(0)
切换Hive的命名空间
xxxxxxxxxx
sparkSession.sql(s"use ${ConfigUtils.HIVE_DATABASE}")
将表TW_SONG_FTUR_D
中的当天的数据转化为DataFrame对象
xxxxxxxxxx
//从表tw_song_ftur_d中读取需要的数据,转换为DataFrame对象
val dataFrame = sparkSession.sql(
s"""
|select data_dt,--日期
| singer1id,--歌手id
| singer1,--歌手名称
| sum(sing_cnt) as sing_cnt,--歌手当日点唱数
| sum(supp_cnt) as supp_cnt,--歌手当日点赞数
| sum(rct_7_sing_cnt) as rct_7_sing_cnt,--歌手近7日点唱数
| sum(rct_7_supp_cnt) as rct_7_supp_cnt,--歌手近7日点赞数
| sum(rct_7_top_sing_cnt) as rct_7_top_sing_cnt,--歌手近7日最高点唱数
| sum(rct_7_top_supp_cnt) as rct_7_top_supp_cnt,--歌手近7日最高点赞数
| sum(rct_30_sing_cnt) as rct_30_sing_cnt,--歌手近30日点唱数
| sum(rct_30_supp_cnt) rct_30_supp_cnt,--歌手近30日点赞数
| sum(rct_30_top_sing_cnt) as rct_30_top_sing_cnt,--歌手近30日最高点唱数
| sum(rct_30_top_supp_cnt) as rct_30_top_supp_cnt--歌手近30日最高点赞数
|from tw_song_ftur_d
|where data_dt = ${currentDate}
|group by data_dt, singer1id, singer1
|""".stripMargin)
导入隐式转换
xxxxxxxxxx
import org.apache.spark.sql.functions._
为dataFrame
对象添加经过公式计算得到的1、7、30日影响力,并注册为临时视图
xxxxxxxxxx
/为dataFrame对象添加进过计算得到的3个列:1、7、30日歌手影响力指数
dataFrame.withColumn("rsi_1d",pow(log(col("sing_cnt")/1+1)*0.63*0.8
+log(col("supp_cnt")/1+1)*0.63*0.2,2)*10)
.withColumn("rsi_7d",pow(
(log(col("rct_7_sing_cnt")/7+1)*0.63+log(col("rct_7_top_sing_cnt")+1)*0.37)*0.8
+
(log(col("rct_7_supp_cnt")/7+1)*0.63+log(col("rct_7_top_supp_cnt")+1)*0.37)*0.2,
2)*10)
.withColumn("rsi_30d",pow(
(log(col("rct_30_sing_cnt")/7+1)*0.63+log(col("rct_30_top_sing_cnt")+1)*0.37)*0.8
+
(log(col("rct_30_supp_cnt")/7+1)*0.63+log(col("rct_30_top_supp_cnt")+1)*0.37)*0.2,
2)*10)
.createOrReplaceTempView("temp_tw_song_ftur_d")
分别从临时视图TEMP_TW_SONG_FTUR_D
中获取1、7、30日影响力对应的DataFrame对象
xxxxxxxxxx
val rsi_1d = sparkSession.sql(
s"""
|select "1" period,singer1id,singer1,rsi_1d as rsi,
| row_number() over(partition by data_dt order by rsi_1d desc) as rsi_rank
|from temp_tw_song_ftur_d
|""".stripMargin)
val rsi_7d = sparkSession.sql(
s"""
|select "7" period,singer1id,singer1,rsi_7d as rsi,
| row_number() over(partition by data_dt order by rsi_7d desc) as rsi_rank
|from temp_tw_song_ftur_d
|""".stripMargin)
val rsi_30d = sparkSession.sql(
s"""
|select "30" period,singer1id,singer1,rsi_30d as rsi,
| row_number() over(partition by data_dt order by rsi_30d desc) as rsi_rank
|from temp_tw_song_ftur_d
|""".stripMargin)
取三者的并集后注册临时视图
xxxxxxxxxx
rsi_1d.union(rsi_7d).union(rsi_30d).createTempView("result")
将临时视图result
中的数据保存到Hive的表TW_SONG_RSI_D
中
xxxxxxxxxx
sparkSession.sql(
s"""
|insert overwrite table tw_singer_rsi_d
|partition(data_dt=${currentDate}) select * from result
|""".stripMargin)
将临时视图result
中的数据排名前30的数据保存到MySQL的表tm_song_rsi
中
xxxxxxxxxx
//将result中的数据排名前30的数据写入到MySQL的表中
val prop = new Properties()
prop.setProperty("user",ConfigUtils.MYSQL_USER)
prop.setProperty("password",ConfigUtils.MYSQL_PASSWORD)
prop.setProperty("driver","com.mysql.jdbc.Driver")
sparkSession.sql(
s"""
|select ${currentDate} as data_dt,period,singer1id,singer1,rsi,rsi_rank
|from result where rsi_rank <= 30
|""".stripMargin).write.mode(SaveMode.Overwrite)
.jdbc(ConfigUtils.MYSQL_URL,"tm_singer_rsi",prop)
println("all finished")