项目:hadoop-oss
文件:NuCypherExtUtilClient.java
public static Peer peerFromSocketAndKey(
SaslDataTransferClient saslClient,Socket s,DataEncryptionKeyFactory keyFactory,Token<BlockTokenIdentifier> blockToken,DatanodeID datanodeId)
throws IOException {
Peer peer = null;
boolean success = false;
try {
peer = peerFromSocket(s);
peer = saslClient.peerSend(peer,keyFactory,blockToken,datanodeId);
success = true;
return peer;
} finally {
if (!success) {
IoUtilsClient.cleanup(null,peer);
}
}
}
项目:hadoop
文件:RemoteBlockReader2.java
protected RemoteBlockReader2(String file,String bpid,long blockId,DataChecksum checksum,boolean verifyChecksum,long startOffset,long firstChunkOffset,long bytesToRead,Peer peer,DatanodeID datanodeID,PeerCache peerCache) {
this.isLocal = DFSClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr()));
// Path is used only for printing block and file information in debug
this.peer = peer;
this.datanodeID = datanodeID;
this.in = peer.getInputStreamChannel();
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max( startOffset,0 );
this.filename = file;
this.peerCache = peerCache;
this.blockId = blockId;
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead),plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
项目:hadoop
文件:SaslDataTransferServer.java
/**
* Receives SASL negotiation for specialized encrypted handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @return new pair of streams,wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getEncryptedStreams(Peer peer,OutputStream underlyingOut,InputStream underlyingIn) throws IOException {
if (peer.hasSecureChannel() ||
dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
return new IOStreamPair(underlyingIn,underlyingOut);
}
Map<String,String> saslProps = createSaslPropertiesForEncryption(
dnConf.getEncryptionAlgorithm());
if (LOG.isDebugEnabled()) {
LOG.debug("Server using encryption algorithm " +
dnConf.getEncryptionAlgorithm());
}
CallbackHandler callbackHandler = new SaslServerCallbackHandler(
new PasswordFunction() {
@Override
public char[] apply(String userName) throws IOException {
return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName));
}
});
return doSaslHandshake(underlyingOut,underlyingIn,saslProps,callbackHandler);
}
项目:hadoop
文件:SaslDataTransferServer.java
/**
* Receives SASL negotiation for general-purpose handshake.
*
* @param peer connection peer
* @param underlyingOut connection output stream
* @param underlyingIn connection input stream
* @return new pair of streams,wrapped after SASL negotiation
* @throws IOException for any error
*/
private IOStreamPair getSaslStreams(Peer peer,underlyingOut);
}
SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
Map<String,String> saslProps = saslPropsResolver.getServerProperties(
getPeerAddress(peer));
CallbackHandler callbackHandler = new SaslServerCallbackHandler(
new PasswordFunction() {
@Override
public char[] apply(String userName) throws IOException {
return buildServerPassword(userName);
}
});
return doSaslHandshake(underlyingOut,callbackHandler);
}
项目:hadoop
文件:BlockReaderFactory.java
/**
* Get the next DomainPeer-- either from the cache or by creating it.
*
* @return the next DomainPeer,or null if we Could not construct one.
*/
private BlockReaderPeer nextDomainPeer() {
if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode,true);
if (peer != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("nextDomainPeer: reusing existing peer " + peer);
}
return new BlockReaderPeer(peer,true);
}
}
DomainSocket sock = clientContext.getDomainSocketFactory().
createSocket(pathInfo,conf.socketTimeout);
if (sock == null) return null;
return new BlockReaderPeer(new DomainPeer(sock),false);
}
项目:hadoop
文件:DFSClient.java
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,DatanodeID datanodeId)
throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
NetUtils.connect(sock,addr,getRandomLocalInterfaceAddr(),dfsClientConf.socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(saslClient,sock,this,datanodeId);
peer.setReadTimeout(dfsClientConf.socketTimeout);
success = true;
return peer;
} finally {
if (!success) {
IoUtils.cleanup(LOG,peer);
IoUtils.closeSocket(sock);
}
}
}
项目:hadoop
文件:PeerCache.java
private synchronized Peer getInternal(DatanodeID dnId,boolean isDomain) {
List<Value> sockStreamList = multimap.get(new Key(dnId,isDomain));
if (sockStreamList == null) {
return null;
}
Iterator<Value> iter = sockStreamList.iterator();
while (iter.hasNext()) {
Value candidate = iter.next();
iter.remove();
long ageMs = Time.monotonicNow() - candidate.getTime();
Peer peer = candidate.getPeer();
if (ageMs >= expiryPeriod) {
try {
peer.close();
} catch (IOException e) {
LOG.warn("got IOException closing stale peer " + peer +
",which is " + ageMs + " ms old");
}
} else if (!peer.isClosed()) {
return peer;
}
}
return null;
}
private dataxceiver(Peer peer,Datanode datanode,dataxceiverServer dataxceiverServer) throws IOException {
this.peer = peer;
this.dnConf = datanode.getDnConf();
this.socketIn = peer.getInputStream();
this.socketout = peer.getoutputStream();
this.datanode = datanode;
this.dataxceiverServer = dataxceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
remoteAddress = peer.getRemoteAddressstring();
final int colonIdx = remoteAddress.indexOf(':');
remoteAddressWithoutPort =
(colonIdx < 0) ? remoteAddress : remoteAddress.substring(0,colonIdx);
localAddress = peer.getLocalAddressstring();
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
}
}
项目:aliyun-oss-hadoop-fs
文件:RemoteBlockReader2.java
protected RemoteBlockReader2(String file,PeerCache peerCache,Tracer tracer) {
this.isLocal = DfsutilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr()));
// Path is used only for printing block and file information in debug
this.peer = peer;
this.datanodeID = datanodeID;
this.in = peer.getInputStreamChannel();
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max( startOffset,plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
this.tracer = tracer;
}
项目:aliyun-oss-hadoop-fs
文件:DFSClient.java
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,DatanodeID datanodeId)
throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
final int socketTimeout = dfsClientConf.getSocketTimeout();
try {
sock = socketFactory.createSocket();
NetUtils.connect(sock,socketTimeout);
peer = DfsutilClient.peerFromSocketAndKey(saslClient,datanodeId);
peer.setReadTimeout(socketTimeout);
peer.setWriteTimeout(socketTimeout);
success = true;
return peer;
} finally {
if (!success) {
IoUtilsClient.cleanup(LOG,peer);
IoUtils.closeSocket(sock);
}
}
}
项目:aliyun-oss-hadoop-fs
文件:PeerCache.java
private synchronized Peer getInternal(DatanodeID dnId,which is " + ageMs + " ms old");
}
} else if (!peer.isClosed()) {
return peer;
}
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:SaslDataTransferServer.java
项目:aliyun-oss-hadoop-fs
文件:SaslDataTransferServer.java
private dataxceiver(Peer peer,dataxceiverServer dataxceiverServer) throws IOException {
super(datanode.getTracer());
this.peer = peer;
this.dnConf = datanode.getDnConf();
this.socketIn = peer.getInputStream();
this.socketout = peer.getoutputStream();
this.datanode = datanode;
this.dataxceiverServer = dataxceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
this.iofilebufferSize = DfsutilClient.getIofilebufferSize(datanode.getConf());
this.smallBufferSize = DfsutilClient.getSmallBufferSize(datanode.getConf());
remoteAddress = peer.getRemoteAddressstring();
final int colonIdx = remoteAddress.indexOf(':');
remoteAddressWithoutPort =
(colonIdx < 0) ? remoteAddress : remoteAddress.substring(0,colonIdx);
localAddress = peer.getLocalAddressstring();
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
}
}
项目:aliyun-oss-hadoop-fs
文件:ErasureCodingWorker.java
private Peer newConnectedPeer(ExtendedBlock b,InetSocketAddress addr,DatanodeID datanodeId)
throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
final int socketTimeout = datanode.getDnConf().getSocketTimeout();
try {
sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
NetUtils.connect(sock,socketTimeout);
peer = DfsutilClient.peerFromSocketAndKey(datanode.getSaslClient(),datanode.getDataEncryptionKeyFactoryForBlock(b),datanodeId);
peer.setReadTimeout(socketTimeout);
success = true;
return peer;
} finally {
if (!success) {
IoUtils.cleanup(null,peer);
IoUtils.closeSocket(sock);
}
}
}
项目:big-c
文件:RemoteBlockReader2.java
protected RemoteBlockReader2(String file,plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
项目:big-c
文件:SaslDataTransferServer.java
项目:big-c
文件:SaslDataTransferServer.java
项目:big-c
文件:BlockReaderFactory.java
/**
* Get the next DomainPeer-- either from the cache or by creating it.
*
* @return the next DomainPeer,false);
}
项目:big-c
文件:DFSClient.java
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,peer);
IoUtils.closeSocket(sock);
}
}
}
项目:big-c
文件:PeerCache.java
private synchronized Peer getInternal(DatanodeID dnId,which is " + ageMs + " ms old");
}
} else if (!peer.isClosed()) {
return peer;
}
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:RemoteBlockReader2.java
protected RemoteBlockReader2(String file,0 );
this.filename = file;
this.peerCache = peerCache;
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead),plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
项目:hadoop-2.6.0-cdh5.4.3
文件:RemoteBlockReader2.java
static void checkSuccess(
BlockOpResponseProto status,ExtendedBlock block,String file)
throws IOException {
if (status.getStatus() != Status.SUCCESS) {
if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK,self="
+ peer.getLocalAddressstring() + ",remote="
+ peer.getRemoteAddressstring() + ",for file " + file
+ ",for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
} else {
throw new IOException("Got error for OP_READ_BLOCK,self="
+ peer.getLocalAddressstring() + ",remote="
+ peer.getRemoteAddressstring() + ",for file " + file
+ ",for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:SaslDataTransferServer.java
项目:hadoop-2.6.0-cdh5.4.3
文件:SaslDataTransferServer.java
项目:hadoop-2.6.0-cdh5.4.3
文件:BlockReaderFactory.java
/**
* Get the next DomainPeer-- either from the cache or by creating it.
*
* @return the next DomainPeer,false);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:DFSClient.java
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,peer);
IoUtils.closeSocket(sock);
}
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PeerCache.java
private synchronized Peer getInternal(DatanodeID dnId,which is " + ageMs + " ms old");
}
} else if (!peer.isClosed()) {
return peer;
}
}
return null;
}
项目:hadoop-plus
文件:DFSInputStream.java
private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
try {
sock = dfsClient.socketFactory.createSocket();
NetUtils.connect(sock,dfsClient.getRandomLocalInterfaceAddr(),dfsClient.getConf().socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(sock,dfsClient.getDataEncryptionKey());
success = true;
return peer;
} finally {
if (!success) {
IoUtils.closeQuietly(peer);
IoUtils.closeQuietly(sock);
}
}
}
项目:hadoop-plus
文件:RemoteBlockReader2.java
protected RemoteBlockReader2(String file,plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
项目:hadoop-plus
文件:RemoteBlockReader2.java
static void checkSuccess(
BlockOpResponseProto status,for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
}
项目:hadoop-plus
文件:PeerCache.java
/**
* Get a cached peer connected to the given Datanode.
* @param dnId The Datanode to get a Peer for.
* @param isDomain Whether to retrieve a DomainPeer or not.
*
* @return An open Peer connected to the DN,or null if none
* was found.
*/
public synchronized Peer get(DatanodeID dnId,boolean isDomain) {
if (capacity <= 0) { // disabled
return null;
}
List<Value> sockStreamList = multimap.get(new Key(dnId,isDomain));
if (sockStreamList == null) {
return null;
}
Iterator<Value> iter = sockStreamList.iterator();
while (iter.hasNext()) {
Value candidate = iter.next();
iter.remove();
if (!candidate.getPeer().isClosed()) {
return candidate.getPeer();
}
}
return null;
}
项目:hadoop-plus
文件:PeerCache.java
/**
* Give an unused socket to the cache.
* @param sock socket not used by anyone.
*/
public synchronized void put(DatanodeID dnId,Peer peer) {
Preconditions.checkNotNull(dnId);
Preconditions.checkNotNull(peer);
if (peer.isClosed()) return;
if (capacity <= 0) {
// Cache disabled.
IoUtils.cleanup(LOG,peer);
return;
}
startExpiryDaemon();
if (capacity == multimap.size()) {
evictOldest();
}
multimap.put(new Key(dnId,peer.getDomainSocket() != null),new Value(peer,Time.monotonicNow()));
}
private dataxceiver(Peer peer,dataxceiverServer dataxceiverServer) throws IOException {
this.peer = peer;
this.dnConf = datanode.getDnConf();
this.socketIn = peer.getInputStream();
this.socketout = peer.getoutputStream();
this.datanode = datanode;
this.dataxceiverServer = dataxceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
remoteAddress = peer.getRemoteAddressstring();
localAddress = peer.getLocalAddressstring();
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
}
}
项目:FlexMap
文件:RemoteBlockReader2.java
protected RemoteBlockReader2(String file,plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
项目:FlexMap
文件:RemoteBlockReader2.java
static void checkSuccess(
BlockOpResponseProto status,for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
}
项目:FlexMap
文件:SaslDataTransferServer.java
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。