大数据全系列 教程
1869个小节阅读:467.6k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
创建练习数据
xxxxxxxxxx
#创建student表列族为f1
create 'student','f1'
put 'student', '001','f1:age','18'
put 'student', '001','f1:name','tom'
put 'student', '002','f1:age','28'
put 'student', '002','f1:name','jerry'
从HBase表加载数据和保存数据至HBase表的Relation实现
xxxxxxxxxx
package com.itbaizhan.hbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.hadoop.hbase.util.{Base64, Bytes}
/**
* 自定义外部数据源:从HBase表加载数据和保存数据至HBase表的Relation实现
*/
class HBaseRelation(context: SQLContext, params: Map[String, String], userSchema: StructType) extends BaseRelation
with TableScan with InsertableRelation with Serializable {
// 连接HBase数据库的属性名称
val HBASE_ZK_QUORUM_KEY: String = "hbase.zookeeper.quorum"
// 地址
val HBASE_ZK_QUORUM_VALUE: String = "zkHosts"
// 端口号key
val HBASE_ZK_PORT_KEY: String = "hbase.zookeeper.property.clientPort"
// 端口号value
val HBASE_ZK_PORT_VALUE: String = "zkPort"
// 表名
val HBASE_TABLE: String = "hbaseTable"
// 列族
val HBASE_TABLE_FAMILY: String = "family"
// 列分隔符
val SPERATOR: String = ","
// 表头信息
val HBASE_TABLE_SELECT_FIELDS: String = "selectFields"
/**
* 表示SparkSQL加载数据和保存程序入口,相当于SparkSession
*/
override def sqlContext: SQLContext = context
/**
* 在SparkSQL中数据封装在DataFrame或者Dataset中Schema信息
*/
override def schema: StructType = userSchema
/**
* 从数据源加载数据,封装至RDD中,每条数据在Row中,结合schema信息,转换为DataFrame
*/
override def buildScan(): RDD[Row] = {
//TODO 1. 读取配置信息,加载HBaseClient配置(主要ZK地址和端口号)
val conf: Configuration = HBaseConfiguration.create()
conf.set(HBASE_ZK_QUORUM_KEY, params(HBASE_ZK_QUORUM_VALUE))
conf.set(HBASE_ZK_PORT_KEY, params(HBASE_ZK_PORT_VALUE))
//TODO 2. 设置表的名称
conf.set(TableInputFormat.INPUT_TABLE, params(HBASE_TABLE))
// TODO: 设置读取列簇和列名称
val scan: Scan = new Scan()
// 设置列簇
val cfBytes: Array[Byte] = Bytes.toBytes(params(HBASE_TABLE_FAMILY))
scan.addFamily(cfBytes)
//TODO 设置列
val fields: Array[String] = params(HBASE_TABLE_SELECT_FIELDS).split(",")
fields.foreach { field =>
scan.addColumn(cfBytes, Bytes.toBytes(field))
}
//TODO 设置Scan过滤数据: 将Scan对象转换为String
conf.set(
TableInputFormat.SCAN, //
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)
)
//TODO 3. 从HBase表加载数据
val datasRDD: RDD[(ImmutableBytesWritable, Result)] = sqlContext.sparkContext
.newAPIHadoopRDD(
conf, //
classOf[TableInputFormat], //输入格式
classOf[ImmutableBytesWritable], //ImmutableBytesWritable就是一种数据类型,可以转为String
classOf[Result] //结果集
)
//TODO 4. 解析获取HBase表每行数据Result,封装至Row对象中
val rowsRDD: RDD[Row] = datasRDD.map { case (_, result) =>
// 基于列名称获取对应的值
val values: Seq[String] = fields.map { field =>
if (field.equals("id")) {
// 转换rowkey
Bytes.toString(result.getRow)
} else {
// 传递列名称和列簇获取value值
val value: Array[Byte] = result.getValue(cfBytes, Bytes.toBytes(field))
// 转换为字符串
Bytes.toString(value)
}
}
// 将Seq序列转换为Row对象
Row.fromSeq(values)
}
//TODO 5. 返回RDD[Row]
rowsRDD
}
/**
* 将DataFrame数据保存至数据源
*
* @param data 数据集
* @param overwrite 是否覆写
*/
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
//1. 设置HBase依赖Zookeeper相关配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set(HBASE_ZK_QUORUM_KEY, params(HBASE_ZK_QUORUM_VALUE))
conf.set(HBASE_ZK_PORT_KEY, params(HBASE_ZK_PORT_VALUE))
//2. 数据写入表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, params(HBASE_TABLE))
//3. 将DataFrame中数据转换为RDD[(RowKey, Put)]
val cfBytes: Array[Byte] = Bytes.toBytes(params(HBASE_TABLE_FAMILY))
val columns: Array[String] = data.columns
//4. 从DataFrame中获取列名称
val datasRDD: RDD[(ImmutableBytesWritable, Put)] = data.rdd.map { row =>
// row 每行数据 转换为 二元组(RowKey, Put)
//a. 获取RowKey值
val rowKey: String = row.getAs[String]("id")
val rkBytes: Array[Byte] = Bytes.toBytes(rowKey)
//b. 构建Put对象
val put: Put = new Put(rkBytes)
//c. 设置列值
columns.foreach { column =>
val value = row.getAs[String](column)
put.addColumn(cfBytes, Bytes.toBytes(column), Bytes.toBytes(value))
}
//d. 返回二元组
(new ImmutableBytesWritable(rkBytes), put)
}
//4. 保存RDD数据至HBase表中
datasRDD.saveAsNewAPIHadoopFile(s"datas/hbase/output-${System.nanoTime()}",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[TableOutputFormat[ImmutableBytesWritable]],
conf
)
}
}