大数据全系列 教程
1869个小节阅读:466.7k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
入口ReduceTask类的run方法中:
xxxxxxxxxx
//Shuffle插件初始化
shuffleConsumerPlugin.init(shuffleContext);
//Shuffle插件执行
rIter = shuffleConsumerPlugin.run();
init方法:
xxxxxxxxxx
public interface ShuffleConsumerPlugin<K, V> {
public void init(Context<K, V> context);
}
Ctrl+Alt+B->Shuffle
xxxxxxxxxx
public void init(ShuffleConsumerPlugin.Context context) {
this.context = context;
//从context对象中获取需要的值
this.reduceId = context.getReduceId();
this.jobConf = context.getJobConf();
this.umbilical = context.getUmbilical();
this.reporter = context.getReporter();
this.metrics = ShuffleClientMetrics.create();
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
this.localMapFiles = context.getLocalMapFiles();
//实例化对象
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//创建merger对象
merger = createMergeManager(context);
}
run()
xxxxxxxxxx
public RawKeyValueIterator run() throws IOException, InterruptedException;
选中run(),Ctrl+Alt+B->Shuffle类:
xxxxxxxxxx
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads 判断是否为本地运行
boolean isLocal = localMapFiles != null;
//如果本地运行(MapTask和ReduceTask在同一个节点中),仅需一个线程进行copy
//如果yarn运行,首先获取SHUFFLE_PARALLEL_COPIES对应配置的值,未配置默认为5个
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
//创建1、5(或者自定义)个Fetcher对象。
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {//仅需启动一个线程即可
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
//numFetchers:为自定义的数量或默认值5
for (int i=0; i < numFetchers; ++i) {
//实例化对象,并分别启动一个对应的线程
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();//copy文件的操作就在Fetcher类run方法中。
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}