大数据全系列 教程
1869个小节阅读:464.7k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
直接通过NewDirectOutputCollector写到HDFS上
MapTask类的runNewMapper方法中:
MapTask->NewDirectOutputCollector->write()方法:
xxxxxxxxxx
public void write(K key, V value)
throws IOException, InterruptedException {
reporter.progress();
long bytesOutPrev = getOutputBytes(fsStats);
out.write(key, value);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
mapOutputRecordCounter.increment(1);
}
分析out对象来源?
xxxxxxxxxx
NewDirectOutputCollector(MRJobConfig jobContext,
JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
this.reporter = reporter;
mapOutputRecordCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = outputFormat.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
点击getRecordWriter()->Ctrl+Alt+B->MapFileOutputFormat
xxxxxxxxxx
public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
TaskAttemptContext context) throws IOException {
Configuration conf = context.getConfiguration();
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(context)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
// find the right codec
Class<?> codecClass = getOutputCompressorClass(context,
DefaultCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
}
Path file = getDefaultWorkFile(context, "");
//将内容直接写入HDFS文件系统中
FileSystem fs = file.getFileSystem(conf);
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(conf, fs, file.toString(),
context.getOutputKeyClass().asSubclass(WritableComparable.class),
context.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec, context);
return new RecordWriter<WritableComparable<?>, Writable>() {
public void write(WritableComparable<?> key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(TaskAttemptContext context) throws IOException {
out.close();
}
};
}