大数据全系列 教程
1869个小节阅读:467.3k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
对应的处理数据的scala文件:GenerateTwUsrBaseinfoD.scala
xxxxxxxxxx
package com.itbaizhan.scala.musicproject.eds
import com.itbaizhan.scala.musicproject.common.{ConfigUtils, DateUtils}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
object GenerateTwUsrBaseinfoD {
private var sparkSession:SparkSession = _
def main(args: Array[String]): Unit = {
//1. 首先判断是否传递参数,没有参数提示:请输入数据日期,格式例如:年月日(20301010)
if(args.length<1){
println("请输入处理数据的日期,格式为年月日(如20301010)")
System.exit(1)
}
val currentDate = args(0)
//2. 判断本地运行还是集群运行,分别构建不同参数的SparkSession对象
if(ConfigUtils.LOCAL_RUN){
sparkSession = SparkSession.builder()
.master("local")
.appName("GenerateTwUsrBaseinfoD")
.config("spark.sql.shuffle.partitions","1")
.config("hive.metastore.uris",ConfigUtils.HIVE_METASTORE_URIS)
.enableHiveSupport().getOrCreate()
sparkSession.sparkContext.setLogLevel("Error")
}else{
sparkSession = SparkSession.builder()
.appName("GenerateTwUsrBaseinfoD")
.enableHiveSupport().getOrCreate()
}
//3. 指定SparkSQL使用的Hive的库`baizhan_music`
sparkSession.sql(s"use ${ConfigUtils.HIVE_DATABASE}")
//4. 获取微信全量用户信息`TO_YCAK_USR_WX_D` ,返回DataFrame对象usrWx
// > REG_CHNL固定值1 -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道
// > WX_ID AS REF_UID, --微信账号
// > substring(REG_TM,1,8) AS REG_DT, --注册日期
// > substring(REG_TM,9,6) AS REG_TM, --注册时间
// > "2" AS USR_TYPE, --用户类型 1-企业 2-个人(不存在该列,指定固定值)
// > NULL AS IS_CERT, --实名认证
// > NULL AS IS_STDNT --是否是学生
val usrWx = sparkSession.sql(
s"""
|SELECT UID, --用户ID
|REG_MID, --注册机器ID
|"1" AS REG_CHNL,--注册渠道
|WX_ID AS REF_UID,--微信ID->渠道ID
|GDR, --性别
|BIRTHDAY, --生日
|MSISDN, --手机号码
|LOC_ID, --地区ID
|LOG_MDE, --注册登录模式
|substring(REG_TM,1,8) AS REG_DT, --注册日期
|substring(REG_TM,9,6) AS REG_TM, --注册时间
|USR_EXP, --用户当前经验值
|SCORE, --累计积分
|LEVEL, --用户等级
|"2" as USR_TYPE,--用户类型 1企业,2个人
|NULL AS IS_CERT,--是否实名认证
|NULL AS IS_STDNT --是否是学生
|FROM TO_YCAK_USR_WX_D
|""".stripMargin)
//5. 获取支付宝用户全量信息`TO_YCAK_USR_ALI_D`,返回DataFrame对象usrAli
// > REG_CHNL固定值2 -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道
// > ALY_ID AS REF_UID, --支付宝账号
// > substring(REG_TM,1,8) AS REG_DT, --注册日期
// > substring(REG_TM,9,6) AS REG_TM, --注册时间
// > NVL(USR_TYPE,"2") AS USR_TYPE, --用户类型 1-企业 2-个人
val usrAli = sparkSession.sql(
s"""
|SELECT UID, --用户ID
|REG_MID, --注册机器ID
|"2" AS REG_CHNL,--注册渠道
|ALY_ID AS REF_UID, --支付宝ID>渠道ID
|GDR, --性别
|BIRTHDAY, --生日
|MSISDN, --手机号码
|LOC_ID, --地区ID
|LOG_MDE, --注册登录模式
|substring(REG_TM,1,8) AS REG_DT, --注册日期
|substring(REG_TM,9,6) AS REG_TM, --注册时间
|USR_EXP, --用户当前经验值
|SCORE, --累计积分
|LEVEL, --用户等级
|NVL(USR_TYPE,"2"), --用户类型,1公司账户,2个人账户
|IS_CERT, --实名认证
|IS_STDNT --是否学生
|FROM TO_YCAK_USR_ALI_D
|""".stripMargin)
//6. 获取QQ 用户全量信息`TO_YCAK_USR_QQ_D`,返回DataFrame对象usrQQ
// > REG_CHNL固定值3 -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道
// > QQID AS REF_UID, --QQ账号
// > substring(REG_TM,1,8) AS REG_DT, --注册日期
// > substring(REG_TM,9,6) AS REG_TM, --注册时间
// > "2" AS USR_TYPE, --用户类型 1-企业 2-个人(不存在该列,指定固定值)
// > NULL AS IS_CERT, --实名认证
// > NULL AS IS_STDNT --是否是学生
val usrQQ = sparkSession.sql(
s"""
|SELECT UID, --用户ID
|REG_MID, --注册机器ID
|"3" AS REG_CHNL,--注册渠道
|QQID AS REF_UID,--QQID->渠道ID
|GDR, --性别
|BIRTHDAY, --生日
|MSISDN, --手机号码
|LOC_ID, --地区ID
|LOG_MDE, --注册登录模式
|substring(REG_TM,1,8) AS REG_DT, --注册日期
|substring(REG_TM,9,6) AS REG_TM, --注册时间
|USR_EXP, --用户当前经验值
|SCORE, --累计积分
|LEVEL, --用户等级
|"2" as USR_TYPE,--用户类型 1企业,2个人
|NULL AS IS_CERT,--是否实名认证
|NULL AS IS_STDNT --是否是学生
|FROM TO_YCAK_USR_QQ_D
|""".stripMargin)
//7. 获取APP用户全量信息 `TO_YCAK_USR_APP_D`,返回DataFrame对象usrApp
// > "4" AS REG_CHNL, -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道
// > APP_ID AS REF_UID, --APP账号
// > NULL AS LOG_MDE, --注册登录方式
// > substring(REG_TM,1,8) AS REG_DT, --注册日期
// > substring(REG_TM,9,6) AS REG_TM, --注册时间
// > 0 AS SCORE, --累计积分
// > "2" AS USR_TYPE, --用户类型 1-企业 2-个人(不存在该列,指定固定值)
// > NULL AS IS_CERT, --实名认证
// > NULL AS IS_STDNT --是否是学生
val usrApp = sparkSession.sql(
s"""
|SELECT UID, --用户ID
|REG_MID, --注册机器ID
|"4" AS REG_CHNL,--注册渠道
|APP_ID AS REF_UID,--QQID->渠道ID
|GDR, --性别
|BIRTHDAY, --生日
|MSISDN, --手机号码
|LOC_ID, --地区ID
|NULL AS LOG_MDE, --注册登录模式
|substring(REG_TM,1,8) AS REG_DT, --注册日期
|substring(REG_TM,9,6) AS REG_TM, --注册时间
|USR_EXP, --用户当前经验值
|0 AS SCORE, --累计积分
|LEVEL, --用户等级
|"2" as USR_TYPE,--用户类型 1企业,2个人
|NULL AS IS_CERT,--是否实名认证
|NULL AS IS_STDNT --是否是学生
|FROM TO_YCAK_USR_APP_D
|""".stripMargin)
//8. 将以上四个DataFrame对象取并集,获取各个平台所有的用户信息
val allUsrInfo = usrWx.union(usrQQ).union(usrAli).union(usrApp)
//9. 从`TO_YCAK_USR_LOGIN_D`用户登录数据增量表,
sparkSession.table("TO_YCAK_USR_LOGIN_D")
//获取指定指定日期内登录的用户`UID `
.where(s"data_dt = $currentDate").select("UID")
//并对`UID`去重,
.distinct()
//然后与所有用户注册信息关联获取当日用户详细信息,
.join(allUsrInfo,Seq("UID"),joinType = "left")
//注册临时视图`TEMP_USR_ACTV`
.createTempView("TEMP_USR_ACTV")
//10. 将以上当日计算得到的活跃用户信息保存至 TW_USR_BASEINFO_D 日增量表中
sparkSession.sql(
s"""
|insert overwrite table TW_USR_BASEINFO_D partition(data_dt=${currentDate})
|select * from TEMP_USR_ACTV
|""".stripMargin)
//11. 获取7天之前的日期
val pre7Date = DateUtils.getCurrentDatePreDate(currentDate, 7)
//12. 获取7日活跃用户信息 保存至 DM 层,保存到MySQL的`songresult`库下的 `user_7days_active`
val prop = new Properties()
prop.setProperty("user",ConfigUtils.MYSQL_USER)
prop.setProperty("password",ConfigUtils.MYSQL_PASSWORD)
prop.setProperty("driver","com.mysql.jdbc.Driver")
// > 使用case when语句将REG_CHNL数值做如下转换
// > 1->微信,2->支付宝,3->QQ,4->APP,其它->未知
// > 用case when语句将GDR数值做如下转换:
// > 1->男 2->女 其它->不明
sparkSession.sql(
s"""
|select A.UID,--用户id
| CASE WHEN B.REG_CHNL = '1' THEN '微信'
| WHEN B.REG_CHNL = '2' THEN '支付宝'
| WHEN B.REG_CHNL = '3' THEN 'QQ'
| WHEN B.REG_CHNL = '4' THEN 'APP'
| ELSE '未知' END REG_CHNL,--注册渠道
| B.REF_UID,--账号ID
| CASE WHEN B.GDR = '1' THEN '男'
| WHEN B.GDR = '2' THEN '女'
| ELSE '不明' END GDR,--性别
| B.BIRTHDAY,--生日
| B.MSISDN,--手机号码
| B.REG_DT,--注册日期
| B.LEVEL --用户等级
|from (select UID,count(*) as cnt
| from TW_USR_BASEINFO_D
| where data_dt between ${pre7Date} and ${currentDate}
| group by UID
| having cnt >= 1) A,TW_USR_BASEINFO_D B
|where B.data_dt=${currentDate} and A.UID = B.UID
|""".stripMargin)
//.show(100)
.write.mode(SaveMode.Overwrite)
.jdbc(ConfigUtils.MYSQL_URL,"user_7days_active",prop)
println("req3 all finished!")
}
}