大数据全系列 教程
1869个小节阅读:468.1k
408考研
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
点击MapTask类的runNewMapper()方法的798行:
xxxxxxxxxx
input.initialize(split, mapperContext);
进入到RecordReader抽象类中:
xxxxxxxxxx
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
选中initialize方法,然后Ctrl+Alt+B->LineRecordReader类的initialize()方法中:
xxxxxxxxxx
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
//起始偏移量=切片的起始偏移量
start = split.getStart();
//结束偏移量=起始偏移量+切片的大小 单位:B
end = start + split.getLength();
//获取切片对应文件的Path路径的对象
final Path file = split.getPath();
// open the file and seek to the start of the split
//获取分布式文件系统对象
final FileSystem fs = file.getFileSystem(job);
//打开文件进行读取
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
//如果start不等于:说明当前MapTask处理的切片不是对应文件的第一个切片,存在行被切断的问题
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {//当前MapTask处理是对应文件的第一个切片,不存在行被切断的风险
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
//如果start!=0,第一行不处理,
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
从第二个切片开始,都放弃第一行的处理,那么Map任务处理的时候,就不存在断行的问题了。每个切片对应的Map任务都会向后多读取一行并进行处理。