大数据全系列 教程
1869个小节阅读:467k
目录
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
###5.4.7 nextKeyValue()方法:
点击context.nextKeyValue()方法,进入到接口TaskInputOutputContext中:
xxxxxxxxxx
public boolean nextKeyValue() throws IOException, InterruptedException;
选中方法nextKeyValue()->Ctrl+Alt+B->MapContextImpl类的nextKeyValue()方法:
xxxxxxxxxx
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
点击:reader.nextKeyValue()->RecodeReader类中:
xxxxxxxxxx
public abstract
boolean nextKeyValue() throws IOException, InterruptedException;
选中nextKeyValue()->LineRecordReader的nextKeyValue()方法:
xxxxxxxxxx
//判断是否还存在未处理的键值对
//LineRecordReader类的注释:Treats keys as offset in file and value as line
//key是偏移量
//value:当前行的内容
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
//读取当前行的内容赋值给value
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
//将偏移量改变
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
LineRecordReader类的中getCurrentKey()和getCurrentValue():
xxxxxxxxxx
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
接着调用自定义Mapper类中的map方法:
xxxxxxxxxx
//文本中的每一行内容调用一次map方法
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
Thread.sleep(99999999);
if(value!=null) {
//value就是读取到的当前行中的内容,并将之转换为字符串
String lineContenct = value.toString();
//将字符串进行处理:先去掉两端的空格,然后在按照空格进行切分
String[] words = lineContenct.trim().split(" ");
//遍历数组,逐一进行输出
for(String word:words){
//将word内容封装到keyOut中
keyOut.set(word);
//将kv对输出
context.write(keyOut, valueOut);
}
}
}