大数据全系列 教程
1869个小节阅读:467.8k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
xxxxxxxxxx
<dependency>
<groupId>com.itbaizhan</groupId>
<artifactId>car-util</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
xxxxxxxxxx
// 订阅 topic,可以一次订阅多个topic
consumer.subscribe(Collections.singletonList(this.topic));
// 从服务端拉取信息,每次poll()可以拉取多个信息
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
System.out.println("消费到消息数:" + records.count());
if (records.count() > 0) {
// 表不存在创建表
if (!HbaseUtil.tableExists(Constants.HTAB_HAIKOU_ORDER)) {
HbaseUtil.createTable(Constants.HTAB_HAIKOU_ORDER, Constants.DEFAULT_FAMILY);
}
Table table = HbaseUtil.getTable(Constants.HTAB_HAIKOU_ORDER);
List<Put> puts = new ArrayList<>();
// rowkey 设计 订单id + 出发时间作为hbase表得rowkey
String rowkey = "";
if (orderMap.size() > 0) {
orderMap.clear();
}
for (ConsumerRecord<Integer, String> record : records) {
// kafka value结果
String value = record.value();
JSONObject jsonObject = JSON.parseObject(value);
JSONObject data = jsonObject.getJSONObject("data");
// 获取表名
String tableName = jsonObject.getString("table");
// 判断表名是否为空
if (StrUtil.isEmpty(tableName) || !tableName.equals("orders")) {
continue;
}
// 过滤掉不合法得数据行
if (data.size() != 24) {
continue;
}
// rowkey 设计 订单id + 出发时间作为hbase表得rowkey
rowkey = data.getString("order_id") + "_" + data.getString("departure_time").replaceAll("-", "").replaceAll(":", "").replaceAll(" ", "");
// 订单所有数据拿出来
orderMap.put("ORDER_ID", data.getString("order_id"));
orderMap.put("PRODUCT_ID", data.getString("product_id"));
orderMap.put("CITY_ID", data.getString("city_id"));
orderMap.put("DISTRICT", data.getString("district"));
orderMap.put("COUNTY", data.getString("county"));
orderMap.put("TYPE", data.getString("type"));
orderMap.put("COMBO_TYPE", data.getString("combo_type"));
orderMap.put("TRAFFIC_TYPE", data.getString("traffic_type"));
orderMap.put("PASSENGER_COUNT", data.getString("passenger_count"));
orderMap.put("DRIVER_PRODUCT_ID", data.getString("driver_product_id"));
orderMap.put("START_DEST_DISTANCE", data.getString("start_dest_distance"));
orderMap.put("ARRIVE_TIME", data.getString("arrive_time"));
orderMap.put("DEPARTURE_TIME", data.getString("departure_time"));
orderMap.put("PRE_TOTAL_FEE", data.getString("pre_total_fee"));
orderMap.put("NORMAL_TIME", data.getString("normal_time"));
orderMap.put("BUBBLE_TRACE_ID", data.getString("bubble_trace_id"));
orderMap.put("PRODUCT_1LEVEL", data.getString("product_1level"));
orderMap.put("DEST_LNG", data.getString("dest_lng"));
orderMap.put("DEST_LAT", data.getString("dest_lat"));
orderMap.put("STARTING_LNG", data.getString("starting_lng"));
orderMap.put("STARTING_LAT", data.getString("starting_lat"));
orderMap.put("YEAR", data.getString("year"));
orderMap.put("MONTH", data.getString("month"));
orderMap.put("DAY", data.getString("day"));
puts.add(HbaseUtil.createPut(rowkey, Constants.DEFAULT_FAMILY.getBytes(), orderMap));
}
table.put(puts);
}
log.warn("************** 正常结束 **********");