大数据全系列 教程
1869个小节阅读:466.8k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
Spark内置很多数据源,却没有HBase的数据源,需要调用rdd的api,如果能有下面这种方式就非常完美了。
xxxxxxxxxx
frame.write.format("hbase")
.mode(SaveMode.Append)
.option(ZK_HOST_HBASE, "itbaizhan.cn")
.option(ZK_PORT_HBASE, 2181)
.option(HBASE_TABLE, "users")
.option(HBASE_FAMILY, "detail")
.option(HBASE_ROW_KEY, "id")
.option(HBASE_SELECT_WHERE_CONDITIONS,"id[ge]50")
.save()
参考spark sql内置的数据源和第三方提供的数据源(如Kudu、Elaticsearch),自定义HBase数据源。
xxxxxxxxxx
package com.itbaizhan.hbase
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* 默认数据源提供Relation对象,分别为加载数据和保存提供Relation对象
*/
class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister{
val SPERATOR: String = ","
val HBASE_TABLE_SELECT_FIELDS: String = "selectFields"
// 使用简称
override def shortName(): String = {
"hbase"
}
/**
* 返回BaseRelation实例对象,提供加载数据功能
* @param sqlContext SQLContext实例对象
* @param parameters 参数信息
* @return
*/
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]
): BaseRelation = {
// 1. 定义Schema信息
val schema: StructType = StructType(
parameters(HBASE_TABLE_SELECT_FIELDS)
.split(SPERATOR)
.map{
field => StructField(field, StringType, nullable = true)
})
// 2. 创建HBaseRelation对象
val relation = new HBaseRelation(sqlContext, parameters, schema)
// 3. 返回对象
relation
}
/*** 返回BaseRelation实例对象,提供保存数据功能
* @param sqlContext SQLContext实例对象
* @param mode 保存模式
* @param parameters 参数
* @param data 数据集
* @return
*/
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame
): BaseRelation = {
// 1. 创建HBaseRelation对象
val relation = new HBaseRelation(sqlContext, parameters, data.schema)
// 2.保存数据
relation.insert(data,true)
// 3.返回
relation
}
}