大数据全系列 教程
1869个小节阅读:466.6k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
从hbase读取数据,经过MR计算,将结果存储于hdfs
需求:将sentence表中的数据,经过MR统计单词数量后保存到HDFS上。
wc.txt->sentence表的数据->MR计算->HDFS
创建句子表:sentence
xxxxxxxxxx
hbase(main):002:0> create 'sentence','cf'
创建包com.itbaizhan.hbase2hdfs
编写InsertSentence 类(将wc.txt文件中的内容添加到HBase的sentence表中)
xxxxxxxxxx
package com.itbaizhan.hbase2hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class InsertSentence {
//定义连接对象
private Connection connection;
//定义Table对象
private Table table;
public void before() throws IOException {
//构造conf对象
Configuration conf = HBaseConfiguration.create();
//设置hbase用到的zk集群
conf.set("hbase.zookeeper.quorum","node2,node3,node4");
//获取连接对象
connection = ConnectionFactory.createConnection(conf);
//获取表的DML对象
table = connection.getTable(TableName.valueOf("sentence"));
}
public void close() throws IOException {
if(table!=null){
table.close();
}
if(connection!=null){
connection.close();
}
}
public void insertData() throws Exception {
//从本地读取hello.txt
BufferedReader bufferedReader =
new BufferedReader(new FileReader(
System.getProperty("user.dir")+ File.separator +"hello.txt"
));
//定义变量,表示读取到的当前行的内容
String line = null;
//定义rowkey
int index = 1;
//逐一读取文本中的内容,并写入到Hbase的sentence表中
while((line = bufferedReader.readLine())!=null){
Put put = new Put(Bytes.toBytes(index));
put.addColumn("cf".getBytes(),"line".getBytes(),line.getBytes());
table.put(put);
index++;
}
//关闭本地输入流对象
bufferedReader.close();
}
//优化:1000行数据插入一次
public void insertData2() throws Exception {
//从本地读取hello.txt
BufferedReader bufferedReader =
new BufferedReader(new FileReader(
System.getProperty("user.dir")+ File.separator +"hello.txt"
));
//定义变量,表示读取到的当前行的内容
String line = null;
//定义rowkey
int index = 1;
//定义一个Put集合
List<Put> putList = new ArrayList<>();
//逐一读取文本中的内容,并写入到Hbase的sentence表中
while((line = bufferedReader.readLine())!=null){
Put put = new Put(Bytes.toBytes(index));
put.addColumn("cf".getBytes(),"line".getBytes(),line.getBytes());
//将put对象添加到putList中
putList.add(put);
//当index是1000的整数倍时执行一次批量插入
if(index%1000==0){
table.put(putList);
//清空putList
putList.clear();
}
index++;
}
if(putList.size()!=0){
table.put(putList);
}
//关闭本地输入流对象
bufferedReader.close();
}
public static void main(String[] args) {
//D:\codes\IDEA2021\itbaizhan\HbaseApiDemo
System.out.println(System.getProperty("user.dir"));
System.out.println(File.separator);
}
}