大数据全系列 教程
1869个小节阅读:465.6k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
通过在WeatherRedcuer类中添加断点,进行调试。我们发现在遍历values是key的值是变化的。
首先ReduceTask类中
xxxxxxxxxx
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(....){
try {
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
点击run方法:
xxxxxxxxxx
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
//调用自定义Reducer类的reduce方法
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
context.getValues()->ReduceContext:
xxxxxxxxxx
public Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
Ctrl+Alt+B->ReduceContextImpl:
xxxxxxxxxx
public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
......
private ValueIterable iterable = new ValueIterable();
......
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator();
public Iterator<VALUEIN> iterator() {
return iterator;
}
}
public
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
}
进入ValueIterator类:
xxxxxxxxxx
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
private boolean inReset = false;
private boolean clearMarkFlag = false;
public boolean hasNext() {
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
return firstValue || nextKeyIsSame;
}
public VALUEIN next() {
if (inReset) {
try {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("next value iterator failed", e);
}
}
// if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) {
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
nextKeyValue();
return value;
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}
......
}
nextKeyValue():迭代values值的时候,key的值也再次的被赋值。所以key的值的变化的。
xxxxxxxxxx
public boolean nextKeyValue() throws IOException, InterruptedException {
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
value = valueDeserializer.deserialize(value);
......
}