大数据全系列 教程
1869个小节阅读:464.9k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
com.itbaizhan.scala.musicproject.eds.machine.GenerateTwMacLocD.scala
xxxxxxxxxx
object GenerateTwMacLocD {
val localRun : Boolean = ConfigUtils.LOCAL_RUN
val hiveMetaStoreUris = ConfigUtils.HIVE_METASTORE_URIS
val hiveDataBase = ConfigUtils.HIVE_DATABASE
var sparkSession : SparkSession = _
def main(args: Array[String]): Unit = {
if(localRun){//本地运行
sparkSession = SparkSession.builder().master("local")
.config("hive.metastore.uris",hiveMetaStoreUris)
.config("spark.sql.shuffle.partitions",10)
.enableHiveSupport().getOrCreate()
}else{//集群运行
sparkSession = SparkSession.builder().config("spark.sql.shuffle.partitions",10).enableHiveSupport().getOrCreate()
}
if(args.length < 1) {
println(s"请输入数据日期,格式例如:年月日(20301004)")
System.exit(1)
}
val analyticDate = args(0)
sparkSession.sql(s"use $hiveDataBase ")
sparkSession.sparkContext.setLogLevel("Error")
//此处添加后续代码
}
}
pre30DaysDataFrame
:xxxxxxxxxx
//根据当前输入的日期,获取过去30天的日期
val pre30Date = DateUtils.getCurrentDatePreDate(analyticDate,30)
val pre30DaysDataFrame = sparkSession.sql(
s"""
| select
| UID, --用户
| MID, --机器
| LAT, --纬度
| LNG --经度
| from TO_YCAK_USR_LOC_D
| where data_dt between ${pre30Date} and ${analyticDate}
""".stripMargin)
TEMP_PRE30_MAC_LOC_INFO
:xxxxxxxxxx
pre30DaysDataFrame
.distinct() //重复用户上报的机器位置不计数
.groupBy("MID","LAT","LNG")
.count()
.withColumnRenamed("LAT", "X")//纬度
.withColumnRenamed("LNG", "Y")//经度
.withColumnRenamed("count", "CNT")
.createTempView("TEMP_PRE30_MAC_LOC_INFO")
xxxxxxxxxx
val macLocDF : DataFrame = sparkSession.sql(
"""
|select
| MID, --机器
| X, --纬度
| Y, --经度
| CNT, --出现次数
| row_number() over(partition by MID order by CNT desc) as RANK
|from TEMP_PRE30_MAC_LOC_INFO
""".stripMargin).filter("x != '' and y != '' and RANK = 1")
描述 | 字段名 | 类型 |
---|---|---|
机器ID | MID | int(11) |
纬度 | X | varchar(14) |
经度 | Y | varchar(14) |
出现此时 | CNT | int(11) |
高德API转换得到:
逻辑模型名称 | 机器位置信息日统计表 | 数据类型 |
---|---|---|
机器ID | MID | int(11) |
纬度 | X | varchar(14) |
经度 | Y | varchar(14) |
出现次数 | CNT | int(11) |
地址 | ADDER | varchar(128) |
省份 | PRVC | varchar(64) |
城市 | CTY | varchar(64) |
城市编码 | CTY_CD | varchar(12) |
县区 | DISTRICT | varchar(64) |
县区编码 | AD_CD | varchar(12) |
乡镇街道 | TOWN_SHIP | varchar(64) |
乡镇街道编码 | TOWN_CD | varchar(12) |
社区名称 | NB_NM | varchar(64) |
社区类型 | NB_TP | varchar(64) |
建筑名称 | BD_NM | varchar(64) |
建筑类型 | BD_TP | varchar(64) |
道路名称 | STREET | varchar(64) |
道路门牌号 | STREET_NB | varchar(64) |
道路坐标 | STREET_LOC | varchar(64) |
道路方位 | STREET_DRCTION | varchar(64) |
门牌地址到请求坐标的距离 | STREET_DSTANCE | varchar(64) |
商圈信息 | BUS_INFO | varchar(128) |
xxxxxxxxxx
/**分批次的调用高德API获取对应的机器位置信息
* @param rowList包含了机器坐标信息的List[Row]
* @return 封装具体机器位置新的List[Row]
*/
def getLocInfoFromGaodeAPI(rowList : List[Row]): ListBuffer[Row] ={
val returnLocList = new ListBuffer[Row]()
//获取rowList中的 每条数据的经纬度,并按照 “|”拼接成字符串
var concatYX = ""
for(i <- 0 until rowList.size){
val X = rowList(i).getAs[String]("X") //纬度
val Y = rowList(i).getAs[String]("Y")//经度
concatYX += Y+","+X+"|"
}
//调用高德api,根据经纬度获取对应的地址
val response: HttpResponse[String] = Http("https://restapi.amap.com/v3/geocode/regeo")
.param("key","c2980141d21b8eeb9b66ca59b94744e9")
.param("location",concatYX.substring(0,concatYX.length-1))
.param("batch","true")
.option(HttpOptions.readTimeout(10000)) //获取数据延迟 10s
.asString
val jsonInfo: JSONObject = JSON.parseObject(response.body.toString)
//结果中返回的地址个数
val returnLocLength = JSON.parseArray(jsonInfo.getString("regeocodes")).size()
if("10000".equals(jsonInfo.getString("infocode"))&&rowList.size == returnLocLength) {
//如果 info 返回10000 代表请求成功,并返回了结果
//从返回的json中获取详细地址,对从高德API中查询的数据进行整理,转换成Row类型的数据返回
val jsonArray: JSONArray = JSON.parseArray(jsonInfo.getString("regeocodes"))
for (i <- 0 until rowList.length) {
val mid = rowList(i).getAs[String]("MID").toInt
val x = rowList(i).getAs[String]("X") //纬度
val y = rowList(i).getAs[String]("Y") //经度
val cnt = rowList(i).getAs[Long]("CNT").toInt //出现次数
val currentJsonObject = jsonArray.getJSONObject(i)
val address = StringUtils.checkString(currentJsonObject.getString("formatted_address"))
val addrCom: JSONObject = currentJsonObject.getJSONObject("addressComponent")
val province = StringUtils.checkString(addrCom.getString("province"))
val city = StringUtils.checkString(addrCom.getString("city"))
val citycode = StringUtils.checkString(addrCom.getString("citycode"))
val district = StringUtils.checkString(addrCom.getString("district"))
val adcode = StringUtils.checkString(addrCom.getString("adcode"))
val township = StringUtils.checkString(addrCom.getString("township"))
val towncode = StringUtils.checkString(addrCom.getString("towncode"))
val neighborhoodName = StringUtils.checkString(addrCom.getJSONObject("neighborhood").getString("name"))
val neighborhoodType = StringUtils.checkString(addrCom.getJSONObject("neighborhood").getString("type"))
val buildingName = StringUtils.checkString(addrCom.getJSONObject("building").getString("name"))
val buildingType = StringUtils.checkString(addrCom.getJSONObject("building").getString("type"))
val street = StringUtils.checkString(addrCom.getJSONObject("streetNumber").getString("street"))
val number = StringUtils.checkString(addrCom.getJSONObject("streetNumber").getString("number"))
val location = StringUtils.checkString(addrCom.getJSONObject("streetNumber").getString("location"))
val direction = StringUtils.checkString(addrCom.getJSONObject("streetNumber").getString("direction"))
val distance = StringUtils.checkString(addrCom.getJSONObject("streetNumber").getString("distance"))
val businessAreas = StringUtils.checkString(addrCom.getString("businessAreas"))
returnLocList.append(Row(mid, x, y, cnt, address, province, city, citycode, district, adcode, township, towncode,
neighborhoodName, neighborhoodType, buildingName, buildingType, street, number, location, direction, distance, businessAreas))
}
}
returnLocList
}
xxxxxxxxxx
//封装数据,调用高德API的方法
val rowRDD: RDD[Row] = macLocDF.rdd.mapPartitions(iter => {
val list: List[Row] = iter.toList
val length = list.length
//定义调用高德API之后,返回的详细位置信息
val detailLocList = new ListBuffer[Row]()
//调用高德API方法的次数
var times = 0
//经纬度坐标;最多支持20个坐标点;多个点之间用"|"分割。
if (length % 20 != 0) {
times = length / 20 + 1
} else {
times = length / 20
}
for (i <- 0 until times) {
//slice(m, n) ,提前集合中第m个元素一直到第n-1个
val currentRowList = list.slice(i * 20, (i + 1) * 20)
//调用自定的方法 [1005,30.23,40.232,1,1]
val rows: ListBuffer[Row] = getLocInfoFromeGaoDeAPI(currentRowList)
//将本批次的结果添加到detailLocList集合中
detailLocList.++=(rows)
}
//返回结果
detailLocList.iterator
})
//添加隐式转换
import org.apache.spark.sql.functions._
//定义StructType对象,
val schema = StructType(Array[StructField](
StructField("MID",IntegerType),
StructField("X",StringType),
StructField("Y",StringType),
StructField("CNT",IntegerType),
StructField("ADDER",StringType),
StructField("PRVC",StringType),
StructField("CTY",StringType),
StructField("CTY_CD",StringType),
StructField("DISTRICT",StringType),
StructField("AD_CD",StringType),
StructField("TOWN_SHIP",StringType),
StructField("TOWN_CD",StringType),
StructField("NB_NM",StringType),
StructField("NB_TP",StringType),
StructField("BD_NM",StringType),
StructField("BD_TP",StringType),
StructField("STREET",StringType),
StructField("STREET_NB",StringType),
StructField("STREET_LOC",StringType),
StructField("STREET_DRCTION",StringType),
StructField("STREET_DSTANCE",StringType),
StructField("BUS_INFO",StringType)
))
//当前统计过去30天中机器位置信息
val pre30DaysMacLocInfos = sparkSession.createDataFrame(rowRDD, schema)
xxxxxxxxxx
//从TW_MAC_LOC_D中查询昨天所有机器的位置信息
val pre1Date = DateUtils.getCurrentDatePreDate(analyticDate, 1)
val pre1DateMacLocInfo = sparkSession.table("TW_MAC_LOC_D")
.where(s"data_dt=${pre1Date}")
//取二者差集,前面与后面不同的数据
val diffMid = per1DateMacLocInfo.select("MID")
.except(pre30DaysMacLocInfos.select("MID"))
// 按照mid 左连接 关联per1DateMacLocInfo 获取30天前的机器详细信息然后与当前计算的 最近30天机器信息做union
diffMid.join(per1DateMacLocInfo,Seq("mid"),"left")
.drop(col("data_dt"))
.union(pre30DaysMacLocInfos)
.createTempView("TEMP_ALL_MAC_LOC_INFO")
xxxxxxxxxx
//将临时视图TEMP_ALL_MAC_LOC_INFO中的数据导入到tw_mac_loc_d表中
sparkSession.sql(
s"""
|insert overwrite table tw_mac_loc_d
|partition(data_dt=${analyticDate})
|select * from TEMP_ALL_MAC_LOC_INFO
|""".stripMargin)
println("### all finished###")
配置分析日期为20301004执行程序
node4的hive客户端查询:
xxxxxxxxxx
hive> select count(*) from tw_mac_loc_d;
OK
_c0
758
Time taken: 15.569 seconds, Fetched: 1 row(s)