大数据全系列 教程
1869个小节阅读:467.1k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
com.itbaizhan.scala.musicproject.eds.user.GenerateTwCnsmBriefD.scala
xxxxxxxxxx
package com.itbaizhan.scala.musicproject.eds.user
import com.itbaizhan.scala.musicproject.common.ConfigUtils
import com.itbaizhan.scala.musicproject.eds.machine.GenerateTwMacLocD.sparkSession
import org.apache.spark.sql.SparkSession
object GenerateTwCnsmBriefD {
private var sparkSession:SparkSession = _
def main(args: Array[String]): Unit = {
//参数合法性验证
if (args.length < 1) {
println("请输入数据日期,格式为年月日(20301004)")
System.exit(1)
}
//接收处理日期
val analyticDate = args(0)
//根据是否本地运行,构建SparkSession对象
if (ConfigUtils.LOCAL_RUN) { //本地运行
sparkSession = SparkSession.builder()
.master("local")
.appName("GenerateTwCnsmBriefD")
.config("spark.sql.shuffle.partitions", "1")
.config("hive.metastore.uris", ConfigUtils.HIVE_METASTORE_URIS)
.enableHiveSupport()
.getOrCreate()
sparkSession.sparkContext.setLogLevel("Error")
} else { //集群运行
sparkSession = SparkSession.builder()
.appName("GenerateTwCnsmBriefD")
.enableHiveSupport()
.getOrCreate()
}
//切换Hive的命名空间
sparkSession.sql(s"use ${ConfigUtils.HIVE_DATABASE}")
//查询分析
sparkSession.sql(
s"""
|select ID,--ID
| TRD_ID,--第三方交易编号
| cast(UID as string) as UID,--用户ID
| MID,--机器ID
| PRDCD_TYPE,--产品类型
| PAY_TYPE,--支付类型
| ACT_TM,--消费时间
| PKG_ID,--套餐ID
| case when AMT<0 then AMT*-1
| else AMT end as COIN_PRC,--币值
| 1 as COIN_CNT,--币数
| ACT_TM as UPDATE_TM,--状态更新时间
| ORDR_ID,--订单ID
| ACTV_NM,--优惠活动名称
| PKG_PRC,--套餐原价
| PKG_DSCNT,--套餐优惠价
| CPN_TYPE,--优惠券类型
| case when ORDR_TYPE = 1 then 0
| when ORDR_TYPE = 2 then 1
| when ORDR_TYPE = 3 then 2
| when ORDR_TYPE = 2 then 2
| end as ABN_TYP--异常类型 0无异常 1异常 2商家退款
|from TO_YCAK_CNSM_D
|where data_dt = ${analyticDate}
|""".stripMargin).createTempView("TEMP_RESULT")
//将临时视图TEMP_RESULT中的数据插入到Hive的tw_cnsm_brief_d中
sparkSession.sql(
s"""
|insert overwrite table TW_CNSM_BRIEF_D
|partition(data_dt=${analyticDate})
|select * from TEMP_RESULT
|""".stripMargin)
println("*****all finished*****")
}
}
配置分析日期为20301004执行程序
node4的hive客户端查询:
xxxxxxxxxx
hive> select count(*) from TW_CNSM_BRIEF_D;
OK
_c0
22512
Time taken: 13.78 seconds, Fetched: 1 row(s)