大数据全系列 教程
1869个小节阅读:464.8k
目录
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
xxxxxxxxxx
def saveOrWriteDate(configuration: Configuration,result:DataFrame,tableName:String): Unit ={
configuration.set(TableOutputFormat.OUTPUT_TABLE,tableName)
val job = new Job(configuration)
// ImmutableBytesWritable 理解为 hbase表中得rowkey (ImmutableBytesWritable,restult)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
// RDD[row]
val kvRDD: RDD[(ImmutableBytesWritable, Put)] = result.rdd.mapPartitions(iterator => {
val newItema = new ArrayBuffer[(ImmutableBytesWritable, Put)]()
while (iterator.hasNext) {
val row: Row = iterator.next()
// row.getString(0) rowkey
val put = new Put( Bytes.toBytes(row.getString(0)) )
// 1. 设置订单id
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("ORDER_ID"), Bytes.toBytes(row.getAs[String]("ORDER_ID")))
// 2. 设置城市得id
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("CITY_ID"), Bytes.toBytes(row.getAs[String]("CITY_ID")))
// 3. 经度
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("STARTING_LNG"), Bytes.toBytes(row.getAs[String]("STARTING_LNG")))
// 4. 维度
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("STARTING_LAT"), Bytes.toBytes(row.getAs[String]("STARTING_LAT")))
// 添加到集合中
newItema+=((new ImmutableBytesWritable, put))
}
Iterator(newItema)
}).flatMap(kv => kv)
HbaseUtil.createTable(tableName,"f1")
kvRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
xxxxxxxxxx
saveOrWriteDate(conf,groupJoinDF,"VIRTUAL_STATION")