大数据全系列 教程
1869个小节阅读:467.7k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
需求:
需求:要求使用SparkSQL实现,并给出计算逻辑说明。
读取 vpnlog 日志文件,其中userName为用户名,ts为记录时间,当type为login时,为登入时间,type为logout为登出时间。如果用户一天开始是登出记录,则认为他当日0:0:0登入,如果一天结束时登入日志,则认为他当日23:59:59登出。
问题:
1)计算这一天,各个用户的在线总时长,在线次数,最大在线时长
2)这一天,每个小时在线用户数
数据文件:data/demo/vpnlog.json
补充知识:
replace(列,字符串1,字符串2) :对某列的数据查找字符串1替换成字符串2
xxxxxxxxxx
> SELECT replace('ABCabc', 'abc', 'DEF');
ABCDEF
xxxxxxxxxx
package com.itbaizhan.sql.examples
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.mutable.ListBuffer
object VpnLogDemo {
def main(args: Array[String]): Unit = {
//1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("VpnLogDemo")
.getOrCreate()
//设置日志的级别
spark.sparkContext.setLogLevel("Error")
//3.读取json文件
val df: DataFrame = spark.read.json("data/demo/vpnlog.json")
//df.show(false)
//4.创建临时视图
df.createTempView("temp1")
//5.处理时间2030-04-07T00:12:02.000Z->2030-04-07 00:12:02
/*spark.sql(
"""
|select username,replace(replace(ts,"T"," "),".000Z","") as ts,type
|from temp1
|""".stripMargin).createTempView("temp2")
//6.给表中的数据行做标号
spark.sql(
"""
|select username,ts,type,
| row_number() over(partition by username order by ts) as rownum
|from temp2
|""".stripMargin).createTempView("temp3")*/
//5和6可以合并到一起
spark.sql(
"""
|select username,replace(replace(ts,"T"," "),".000Z","") as ts,type,
| row_number() over(partition by username order by ts) as rownum
|from temp1
|""".stripMargin).createTempView("temp3")
//7.进行自关联,错位匹配
spark.sql(
"""
|select a.username username1,a.ts ts1,a.type type1,a.rownum rownum1,
| b.username username2,b.ts ts2,b.type type2,b.rownum rownum2
|from temp3 a full outer join temp3 b
|on a.username = b.username and a.rownum = b.rownum-1
|order by a.username,a.ts
|""".stripMargin).createTempView("temp4")
//8.数据补全,过滤保留type1='login' and type2='logout'
spark.sql(
"""
|select username1,ts1,type1,username2,ts2,type2
|from (select
| case when username1 is null then username2 else username1 end username1,
| case when ts1 is null then concat(split(ts2," ")[0],' 00:00:00') else ts1 end ts1,
| case when type1 is null then 'login' else type1 end type1,
| case when username2 is null then username1 else username2 end username2,
| case when ts2 is null then concat(split(ts1," ")[0],' 23:59:59') else ts2 end ts2,
| case when type2 is null then 'logout' else type2 end type2
|from temp4) tb_temp
|where type1='login' and type2='logout'
|""".stripMargin).createTempView("temp5")
//9.补充:将yyyy-MM-dd HH:mm:ss 转换为时间戳 单位为秒
/*spark.sql(
"""
|select (unix_timestamp('2030-04-07 22:15:36','yyyy-MM-dd HH:mm:ss')
| -unix_timestamp('2030-04-07 22:14:36','yyyy-MM-dd HH:mm:ss')) dur
|""".stripMargin).show()*/
//10.统计每次登陆的在线时长,单位为秒
spark.sql(
"""
|select username1 username,ts1,ts2,
|(unix_timestamp(ts2,'yyyy-MM-dd HH:mm:ss')-unix_timestamp(ts1,'yyyy-MM-dd HH:mm:ss')) dur
|from temp5
|""".stripMargin).createTempView("temp6")
//11.计算这一天,各个用户的在线总时长,在线次数,最大在线时长
spark.sql(
"""
|select username,sum(dur) totaldur,count(*) totalcount,max(dur) maxdur
|from temp6
|group by username
|""".stripMargin).show(100,false)
//12.自定义udf函数
spark.udf.register("myudf",(ts:String,count:Int)=>{
//定义一个List集合
val list = new ListBuffer[String]()
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val calendar: Calendar = Calendar.getInstance()
//将ts设置到calendar中
calendar.setTime(sdf.parse(ts))
//循环count+1
for(i <- 0 to count){
//将时间字符串添加到List中
list.append(sdf.format(calendar.getTime))
//将ts的小时加1
calendar.add(Calendar.HOUR,1)
}
list
})
//13.使用自定义函数+explode对数据进行一对多的转换
spark.sql(
"""
|select username1,ts1,type1,username2,ts2,type2,
|explode(myudf(ts1,hour(ts2)-hour(ts1))) as tt
|from temp5
|""".stripMargin).createTempView("temp7")
//14.2030-04-07 07:04:36->2030-04-07 07 去重(username,transtime)
spark.sql(
"""
|select distinct username1 username,
| from_unixtime(unix_timestamp(tt,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH') transtime
|from temp7
|order by username,transtime
|""".stripMargin).createTempView("temp8")
//15.分组统计:每个小时在线的用户数
spark.sql(
"""
|select transtime,count(username) as usercount
|from temp8
|group by transtime
|order by transtime desc
|""".stripMargin).show(100,false)
spark.close()
}
}