大数据全系列 教程
1869个小节阅读:467.6k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
创建类com.itbaizhan.scala.musicproject.eds.machine.GenerateTwMacStatD
构建SparkSession对象
接收处理数据日期参数的合法性验证
xxxxxxxxxx
if(args.length < 1) {
println(s"请输入数据日期,格式例如:年月日(20301004)")
System.exit(1)
}
val analyticDate = args(0)
切换Hive的命名空间
分别从以下四个EDS层的数据表中查询指定日期的数据,并创建同名的临时视图
TW_MAC_BASEINFO_D
、TW_USR_BASEINFO_D
、TW_MAC_LOC_D
、TW_CNSM_BRIEF_D
xxxxxxxxxx
//获取当日的 TW_MAC_BASEINFO_D 机器基本信息
sparkSession.table("TW_MAC_BASEINFO_D").where(s"data_dt = ${analyticDate}")
.createTempView("TW_MAC_BASEINFO_D")
//获取当日的 TW_MAC_LOC_D 机器位置信息表数据
sparkSession.table("TW_MAC_LOC_D").where(s"data_dt = ${analyticDate}")
.createTempView("TW_MAC_LOC_D")
//获取当日的 TW_CNSM_BRIEF_D 消费退款订单流水日增量表
sparkSession.table("TW_CNSM_BRIEF_D").where(s"data_dt = ${analyticDate}")
.createTempView("TW_CNSM_BRIEF_D")
//获取当日的 TW_USR_BASEINFO_D 活跃用户基础信息日增量表
sparkSession.table("TW_USR_BASEINFO_D").where(s"data_dt = ${analyticDate}")
.createTempView("TW_USR_BASEINFO_D")
根据4得到的TW_CNSM_BRIEF_D
进行机器营收情况统计,并注册临时视图TEMP_REV
注意:这里获取 ABN_TYP = 0 的数据,就是正常订单数据。同时按照MID,PKG_ID,PAY_TYPE分组
xxxxxxxxxx
sparkSession.sql(
"""
| select
| MID, --机器ID
| PKG_ID, --套餐ID
| PAY_TYPE, --支付类型
| COUNT(DISTINCT UID) as CNSM_USR_CNT, --总消费用户数
| SUM(COIN_CNT * COIN_PRC) as TOT_REV, --总营收
| COUNT(ORDR_ID) as REV_ORDR_CNT --总营收订单数
| from TW_CNSM_BRIEF_D
| where ABN_TYP = 0
| group by MID,PKG_ID,PAY_TYPE
""".stripMargin).createTempView("TEMP_REV")
根据4得到的TW_CNSM_BRIEF_D
进行机器机器退款统计,并注册临时视图TEMP_REF
注意:这里获取 ABN_TYP = 2 的数据,就是退款订单数据。同时按照MID,PKG_ID,PAY_TYPE分组
xxxxxxxxxx
sparkSession.sql(
"""
| select
| MID, --机器ID
| PKG_ID, --套餐ID
| PAY_TYPE, --支付类型
| COUNT(DISTINCT UID) as REF_USR_CNT, --总退款用户数
| SUM(COIN_CNT * COIN_PRC) as TOT_REF, --总退款
| COUNT(ORDR_ID) as REF_ORDR_CNT --总退款订单数
| from TW_CNSM_BRIEF_D
| where ABN_TYP = 2
| group by MID,PKG_ID,PAY_TYPE
""".stripMargin).createTempView("TEMP_REF")
根据4得到的TW_USR_BASEINFO_D
进行每台
机器新注册用户数统计,并注册临时视图TEMP_USR_NEW
xxxxxxxxxx
sparkSession.sql(
s"""
|select
| REG_MID as MID, --机器ID
| count(UID) as NEW_USR_CNT --新增用户个数
|from TW_USR_BASEINFO_D
|where REG_DT = ${analyticDate}
|group by REG_MID
""".stripMargin).createTempView("TEMP_USR_NEW")
基于4得到的TW_MAC_BASEINFO_D
和TW_MAC_LOC_D
,以及TEMP_REV
、TEMP_REF
、TEMP_REF
联合查询,统计得到机器日营收情况信息并创建临时视图TEMP_MAC_RESULT
xxxxxxxxxx
sparkSession.sql(
"""
|SELECT
| A.MID, --机器ID
| A.MAC_NM, --机器名称
| A.PRDCT_TYP, --产品类型
| A.STORE_NM, --门店名称
| A.BUS_MODE, --运营模式
| A.PAY_SW, --是否开通移动支付
| A.SCENCE_CATGY, --主场景分类
| A.SUB_SCENCE_CATGY, --子场景分类
| A.SCENE, --主场景
| A.SUB_SCENE, --子场景
| A.BRND, --主场景品牌
| A.SUB_BRND, --子场景品牌
| NVL(B.PRVC,A.PRVC) AS PRVC, --省份
| NVL(B.CTY,A.CTY) AS CTY, --城市
| NVL(B.DISTRICT,A.AREA) AS AREA, --区县
| A.PRTN_NM as AGE_ID, --代理人ID
| A.INV_RATE, --投资人分成比例
| A.AGE_RATE, --代理人、联盟人分成比例
| A.COM_RATE, --公司分成比例
| A.PAR_RATE, --合作方分成比例
| C.PKG_ID, --套餐ID
| C.PAY_TYPE, --支付类型
| NVL(C.CNSM_USR_CNT,0) AS CNSM_USR_CNT,--总消费用户数
| NVL(D.REF_USR_CNT,0) AS REF_USR_CNT, --总退款用户数
| NVL(E.NEW_USR_CNT,0) AS NEW_USR_CNT,--总新增用户数
| NVL(C.REV_ORDR_CNT,0) AS REV_ORDR_CNT,--总营收订单数
| NVL(D.REF_ORDR_CNT,0) AS REF_ORDR_CNT,--总退款订单数
| NVL(C.TOT_REV,0) AS TOT_REV, --总营收
| NVL(D.TOT_REF,0) AS TOT_REF --总退款
|FROM TW_MAC_BASEINFO_D A --机器基础信息
|LEFT JOIN TW_MAC_LOC_D B on A.MID = B.MID--机器当日位置
|LEFT JOIN TEMP_REV C on A.MID = C.MID --机器当日营收信息
|LEFT JOIN TEMP_REF D on A.MID = D.MID
| AND C.MID = D.MID
| AND C.PKG_ID = D.PKG_ID
| AND C.PAY_TYPE = D.PAY_TYPE --机器当日退款信息
|LEFT JOIN TEMP_USR_NEW E on A.MID = E.MID--机器当日新增用户信息
""".stripMargin).createTempView("TEMP_MAC_RESULT")
将数据从TEMP_MAC_RESULT
加载到对应的 EDS层 TW_MAC_STAT_D
分区表中
xxxxxxxxxx
//将数据结果加载到Hive的EDS层的TW_MAC_STAT_D分区表中
sparkSession.sql(
s"""
|insert overwrite table TW_MAC_STAT_D
|partition(data_dt = ${analyticDate})
|select * from TEMP_MAC_RESULT
|""".stripMargin)
将TEMP_MAC_RESULT
结果保存至 mysql的songresult.tm_machine_rev_infos
中,作为DM层结果
xxxxxxxxxx
//将数据结果保存到MySQL的songresult.tm_machine_rev_infos中
val prop = new Properties()
prop.setProperty("user",ConfigUtils.MYSQL_USER)
prop.setProperty("password",ConfigUtils.MYSQL_PASSWORD)
prop.setProperty("driver","com.mysql.jdbc.Driver")
sparkSession.sql(
s"""
|select ${analyticDate} as data_dt,*
|from TEMP_MAC_RESULT
|""".stripMargin).write.mode(SaveMode.Append)
.jdbc(ConfigUtils.MYSQL_URL,"tm_machine_rev_infos",prop)
println("*****all finished*****")