大数据全系列 教程
1869个小节阅读:465.3k
JAVA全系列 教程
面向对象的程序设计语言
Python全系列 教程
Python3.x版本,未来主流的版本
人工智能 教程
顺势而为,AI创新未来
大厂算法 教程
算法,程序员自我提升必经之路
C++ 教程
一门通用计算机编程语言
微服务 教程
目前业界流行的框架组合
web前端全系列 教程
通向WEB技术世界的钥匙
大数据全系列 教程
站在云端操控万千数据
AIGC全能工具班
A A
White Night
copyFromHost(host):
xxxxxxxxxx
protected void copyFromHost(MapHost host) throws IOException {
.....
// Construct the url and connect
URL url = getMapOutputURL(host, maps);
DataInputStream input = null;
try {
input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}
......
}
}
openShuffleUrl(host, remaining, url):
xxxxxxxxxx
private DataInputStream openShuffleUrl(MapHost host,
Set<TaskAttemptID> remaining, URL url) {
DataInputStream input = null;
try {
//进入该方法查看
setupConnectionsWithRetry(url);
if (stopped) {
abortConnect(host, remaining);
} else {
//connection代表一个http或https连接
input = new DataInputStream(connection.getInputStream());
}
} catch (TryAgainLaterException te) {
LOG.warn("Connection rejected by the host " + te.host +
". Will retry later.");
scheduler.penalize(host, te.backoff);
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
scheduler.hostFailed(host.getHostName());
for(TaskAttemptID left: remaining) {
scheduler.copyFailed(left, host, false, connectExcpt);
}
}
return input;
}
setupConnectionsWithRetry():
xxxxxxxxxx
private void setupConnectionsWithRetry(URL url) throws IOException {
openConnectionWithRetry(url);
if (stopped) {
return;
}
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
shuffleSecretKey);
setupShuffleConnection(encHash);
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
openConnectionWithRetry():
xxxxxxxxxx
private void openConnectionWithRetry(URL url) throws IOException {
long startTime = Time.monotonicNow();
boolean shouldWait = true;
while (shouldWait) {
try {
//打开连接
openConnection(url);
shouldWait = false;
}
......
}
}
openConnection(url):
xxxxxxxxxx
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (sslShuffle) {
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在reduce端默认启动5个守护线程进行数据的copy,copy数据是通过http get的方式进行的。