大数据全系列 教程
1869个小节阅读:464.9k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
NewInstallUserRunner类代码如下:
xxxxxxxxxx
package cn.itbaizhan.tranformer.mr;
import cn.itbaizhan.common.EventLogConstants;
import cn.itbaizhan.common.GlobalConstants;
import cn.itbaizhan.tranformer.model.MapWritableValue;
import cn.itbaizhan.tranformer.model.StatsUserDimesion;
import cn.itbaizhan.tranformer.model.TimeOutputValue;
import cn.itbaizhan.util.TimeUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import java.util.Arrays;
import java.util.List;
public class NewInstallUserRunner implements Tool {
//定义配置文件对象
private Configuration conf = null;
//定义日志对象
private static final Logger LOGGER = Logger.getLogger(NewInstallUserRunner.class);
public static void main(String[] args) {
try {
ToolRunner.run(new Configuration(true),new NewInstallUserRunner(),args);
} catch (Exception e) {
LOGGER.error("分析新增用户作业运行异常:"+e.getMessage());
}
}
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
//处理 日期参数
this.processArgs(conf,args);
Job job = Job.getInstance(conf,"new install user");
job.setJarByClass(NewInstallUserRunner.class);
//数据从hbase中读取
TableMapReduceUtil.initTableMapperJob(
getScans(),//设置查询的表,查询的列,过滤条件的设置(en=e_l,startRow和stopRow)
NewInstallUserMapper.class,//指定Mapper类
StatsUserDimesion.class,//Mapper类输出的key的类型
TimeOutputValue.class,
job,
false
);
//设置Reducer相关
job.setOutputKeyClass(StatsUserDimesion.class);
job.setOutputValueClass(MapWritableValue.class);
job.setReducerClass(NewInstallUserReducer.class);
//由于Reducer类处理的后结果需要写入到MySQL中,所以需要指定OutputFormat类
job.setOutputFormatClass(TransformerOutputFormat.class);
return job.waitForCompletion(true)?0:-1;
}
private List<Scan> getScans() {
//定义一个Scan对象
Scan scan = new Scan();
//获取处理的日期 2030-02-18
String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
//设置查询哪天的数据
//2030-02-18 00:00:00
long startTime = TimeUtil.parseString2Long(date);
scan.withStartRow(Bytes.toBytes(startTime),true);
//2030-02-19 00:00:00
long stopTime = startTime + GlobalConstants.DAY_OF_MILLISECONDS;
scan.withStopRow(Bytes.toBytes(stopTime),false);
//设置en=e_l过滤条件
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter valueFilter = new SingleColumnValueFilter(
EventLogConstants.EVENT_LOGS_FAMILY_NAME.getBytes(),//列族
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME.getBytes(),//列描述符
CompareOperator.EQUAL,// 等于
EventLogConstants.EventEnum.LAUNCH.alias.getBytes()//e_l
);
//将valueFilter添加filterList对象中
filterList.addFilter(valueFilter);
//指定查询的列 en,s_time,pl,u_ud,browser,browser_v
String columns[] = {
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME,
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME,
EventLogConstants.LOG_COLUMN_NAME_PLATFORM,
EventLogConstants.LOG_COLUMN_NAME_UUID,
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME,
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION
};
//方式一:遍历添加 直观
for(String column:columns){
scan.addColumn(EventLogConstants.EVENT_LOGS_FAMILY_NAME.getBytes(),
column.getBytes());
}
//方式二 开阔眼界
/*MultipleColumnPrefixFilter columnPrefixFilter = getColumnsFilter(columns);
filterList.addFilter(columnPrefixFilter);*/
//将filterList添加到scan对象中
scan.setFilter(filterList);
//指定查询的表名称
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME,
EventLogConstants.HBASE_NAME_EVENT_LOGS.getBytes());
//转换为List<Scan>,并返回
return Arrays.asList(scan);
}
private MultipleColumnPrefixFilter getColumnsFilter(String[] columns) {
int length = columns.length;
byte[][] bytes = new byte[length][];
for(int i =0;i<length;i++){
bytes[i] = columns[i].getBytes();
}
return new MultipleColumnPrefixFilter(bytes);
}
/**解析输入的参数(日期参数,代表要分析的是哪天的数据)
* @param conf 配置文件对象,解析后的日期的值保存到该对象中,后续使用可以从该对象中获取
* @param args 传入的参数
*/
private void processArgs(Configuration conf, String[] args) {
//-d yyyy-MM-dd
String date = null;
for(int i = 0;i<args.length;i++){
if("-d".equals(args[i])&&i+1<args.length){
date = args[i+1];
break;
}
}
//验证日期格式是否符合要求,不符合要求的话,默认使用昨天
if(StringUtils.isBlank(date)|| !TimeUtil.isValidateRunningDate(date)){
date = TimeUtil.getYesterday();
}
//将日期保持到conf对象中
conf.set(GlobalConstants.RUNNING_DATE_PARAMES,date);
}
public void setConf(Configuration conf) {
//设置hadoop和hbase相关的参数
conf.set("mapreduce.framework.name","local");
conf.set("hbase.zookeeper.quorum","node2,node3,node4");
//加载配置文件
conf.addResource("output-collector.xml");
conf.addResource("query-mapping.xml");
conf.addResource("transformer-env.xml");
this.conf = HBaseConfiguration.create(conf);
}
public Configuration getConf() {
return this.conf;
}
}