微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

org.apache.hadoop.hdfs.net.Peer的实例源码

项目:hadoop-oss    文件NuCypherExtUtilClient.java   
public static Peer peerFromSocketAndKey(
    SaslDataTransferClient saslClient,Socket s,DataEncryptionKeyFactory keyFactory,Token<BlockTokenIdentifier> blockToken,DatanodeID datanodeId)
    throws IOException {
  Peer peer = null;
  boolean success = false;
  try {
    peer = peerFromSocket(s);
    peer = saslClient.peerSend(peer,keyFactory,blockToken,datanodeId);
    success = true;
    return peer;
  } finally {
    if (!success) {
      IoUtilsClient.cleanup(null,peer);
    }
  }
}
项目:hadoop    文件RemoteBlockReader2.java   
protected RemoteBlockReader2(String file,String bpid,long blockId,DataChecksum checksum,boolean verifyChecksum,long startOffset,long firstChunkOffset,long bytesToRead,Peer peer,DatanodeID datanodeID,PeerCache peerCache) {
  this.isLocal = DFSClient.isLocalAddress(NetUtils.
      createSocketAddr(datanodeID.getXferAddr()));
  // Path is used only for printing block and file information in debug
  this.peer = peer;
  this.datanodeID = datanodeID;
  this.in = peer.getInputStreamChannel();
  this.checksum = checksum;
  this.verifyChecksum = verifyChecksum;
  this.startOffset = Math.max( startOffset,0 );
  this.filename = file;
  this.peerCache = peerCache;
  this.blockId = blockId;

  // The total number of bytes that we need to transfer from the DN is
  // the amount that the user wants (bytesToRead),plus the padding at
  // the beginning in order to chunk-align. Note that the DN may elect
  // to send more than this amount if the read starts/ends mid-chunk.
  this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();
}
项目:hadoop    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for specialized encrypted handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,wrapped after SASL negotiation
 * @throws IOException for any error
 */
private IOStreamPair getEncryptedStreams(Peer peer,OutputStream underlyingOut,InputStream underlyingIn) throws IOException {
  if (peer.hasSecureChannel() ||
      dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
    return new IOStreamPair(underlyingIn,underlyingOut);
  }

  Map<String,String> saslProps = createSaslPropertiesForEncryption(
    dnConf.getEncryptionAlgorithm());

  if (LOG.isDebugEnabled()) {
    LOG.debug("Server using encryption algorithm " +
      dnConf.getEncryptionAlgorithm());
  }

  CallbackHandler callbackHandler = new SaslServerCallbackHandler(
    new PasswordFunction() {
      @Override
      public char[] apply(String userName) throws IOException {
        return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName));
      }
    });
  return doSaslHandshake(underlyingOut,underlyingIn,saslProps,callbackHandler);
}
项目:hadoop    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for general-purpose handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,wrapped after SASL negotiation
 * @throws IOException for any error
 */
private IOStreamPair getSaslStreams(Peer peer,underlyingOut);
  }

  SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
  Map<String,String> saslProps = saslPropsResolver.getServerProperties(
    getPeerAddress(peer));

  CallbackHandler callbackHandler = new SaslServerCallbackHandler(
    new PasswordFunction() {
      @Override
      public char[] apply(String userName) throws IOException {
        return buildServerPassword(userName);
      }
  });
  return doSaslHandshake(underlyingOut,callbackHandler);
}
项目:hadoop    文件BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer,or null if we Could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode,true);
    if (peer != null) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("nextDomainPeer: reusing existing peer " + peer);
      }
      return new BlockReaderPeer(peer,true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo,conf.socketTimeout);
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock),false);
}
项目:hadoop    文件DFSClient.java   
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,DatanodeID datanodeId)
    throws IOException {
  Peer peer = null;
  boolean success = false;
  Socket sock = null;
  try {
    sock = socketFactory.createSocket();
    NetUtils.connect(sock,addr,getRandomLocalInterfaceAddr(),dfsClientConf.socketTimeout);
    peer = TcpPeerServer.peerFromSocketAndKey(saslClient,sock,this,datanodeId);
    peer.setReadTimeout(dfsClientConf.socketTimeout);
    success = true;
    return peer;
  } finally {
    if (!success) {
      IoUtils.cleanup(LOG,peer);
      IoUtils.closeSocket(sock);
    }
  }
}
项目:hadoop    文件PeerCache.java   
private synchronized Peer getInternal(DatanodeID dnId,boolean isDomain) {
  List<Value> sockStreamList = multimap.get(new Key(dnId,isDomain));
  if (sockStreamList == null) {
    return null;
  }

  Iterator<Value> iter = sockStreamList.iterator();
  while (iter.hasNext()) {
    Value candidate = iter.next();
    iter.remove();
    long ageMs = Time.monotonicNow() - candidate.getTime();
    Peer peer = candidate.getPeer();
    if (ageMs >= expiryPeriod) {
      try {
        peer.close();
      } catch (IOException e) {
        LOG.warn("got IOException closing stale peer " + peer +
              ",which is " + ageMs + " ms old");
      }
    } else if (!peer.isClosed()) {
      return peer;
    }
  }
  return null;
}
项目:hadoop    文件dataxceiver.java   
private dataxceiver(Peer peer,Datanode datanode,dataxceiverServer dataxceiverServer) throws IOException {

  this.peer = peer;
  this.dnConf = datanode.getDnConf();
  this.socketIn = peer.getInputStream();
  this.socketout = peer.getoutputStream();
  this.datanode = datanode;
  this.dataxceiverServer = dataxceiverServer;
  this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
  remoteAddress = peer.getRemoteAddressstring();
  final int colonIdx = remoteAddress.indexOf(':');
  remoteAddressWithoutPort =
      (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0,colonIdx);
  localAddress = peer.getLocalAddressstring();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of active connections is: "
        + datanode.getXceiverCount());
  }
}
项目:aliyun-oss-hadoop-fs    文件RemoteBlockReader2.java   
protected RemoteBlockReader2(String file,PeerCache peerCache,Tracer tracer) {
  this.isLocal = DfsutilClient.isLocalAddress(NetUtils.
      createSocketAddr(datanodeID.getXferAddr()));
  // Path is used only for printing block and file information in debug
  this.peer = peer;
  this.datanodeID = datanodeID;
  this.in = peer.getInputStreamChannel();
  this.checksum = checksum;
  this.verifyChecksum = verifyChecksum;
  this.startOffset = Math.max( startOffset,plus the padding at
  // the beginning in order to chunk-align. Note that the DN may elect
  // to send more than this amount if the read starts/ends mid-chunk.
  this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();
  this.tracer = tracer;
}
项目:aliyun-oss-hadoop-fs    文件DFSClient.java   
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,DatanodeID datanodeId)
    throws IOException {
  Peer peer = null;
  boolean success = false;
  Socket sock = null;
  final int socketTimeout = dfsClientConf.getSocketTimeout();
  try {
    sock = socketFactory.createSocket();
    NetUtils.connect(sock,socketTimeout);
    peer = DfsutilClient.peerFromSocketAndKey(saslClient,datanodeId);
    peer.setReadTimeout(socketTimeout);
    peer.setWriteTimeout(socketTimeout);
    success = true;
    return peer;
  } finally {
    if (!success) {
      IoUtilsClient.cleanup(LOG,peer);
      IoUtils.closeSocket(sock);
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件DfsutilClient.java   
public static Peer peerFromSocketAndKey(
      SaslDataTransferClient saslClient,DatanodeID datanodeId)
      throws IOException {
  Peer peer = null;
  boolean success = false;
  try {
    peer = peerFromSocket(s);
    peer = saslClient.peerSend(peer,peer);
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件PeerCache.java   
private synchronized Peer getInternal(DatanodeID dnId,which is " + ageMs + " ms old");
      }
    } else if (!peer.isClosed()) {
      return peer;
    }
  }
  return null;
}
项目:aliyun-oss-hadoop-fs    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for specialized encrypted handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}
项目:aliyun-oss-hadoop-fs    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for general-purpose handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}
项目:aliyun-oss-hadoop-fs    文件dataxceiver.java   
private dataxceiver(Peer peer,dataxceiverServer dataxceiverServer) throws IOException {
  super(datanode.getTracer());
  this.peer = peer;
  this.dnConf = datanode.getDnConf();
  this.socketIn = peer.getInputStream();
  this.socketout = peer.getoutputStream();
  this.datanode = datanode;
  this.dataxceiverServer = dataxceiverServer;
  this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
  this.iofilebufferSize = DfsutilClient.getIofilebufferSize(datanode.getConf());
  this.smallBufferSize = DfsutilClient.getSmallBufferSize(datanode.getConf());
  remoteAddress = peer.getRemoteAddressstring();
  final int colonIdx = remoteAddress.indexOf(':');
  remoteAddressWithoutPort =
      (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0,colonIdx);
  localAddress = peer.getLocalAddressstring();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of active connections is: "
        + datanode.getXceiverCount());
  }
}
项目:aliyun-oss-hadoop-fs    文件ErasureCodingWorker.java   
private Peer newConnectedPeer(ExtendedBlock b,InetSocketAddress addr,DatanodeID datanodeId)
    throws IOException {
  Peer peer = null;
  boolean success = false;
  Socket sock = null;
  final int socketTimeout = datanode.getDnConf().getSocketTimeout(); 
  try {
    sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
    NetUtils.connect(sock,socketTimeout);
    peer = DfsutilClient.peerFromSocketAndKey(datanode.getSaslClient(),datanode.getDataEncryptionKeyFactoryForBlock(b),datanodeId);
    peer.setReadTimeout(socketTimeout);
    success = true;
    return peer;
  } finally {
    if (!success) {
      IoUtils.cleanup(null,peer);
      IoUtils.closeSocket(sock);
    }
  }
}
项目:big-c    文件RemoteBlockReader2.java   
protected RemoteBlockReader2(String file,plus the padding at
  // the beginning in order to chunk-align. Note that the DN may elect
  // to send more than this amount if the read starts/ends mid-chunk.
  this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();
}
项目:big-c    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for specialized encrypted handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}
项目:big-c    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for general-purpose handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}
项目:big-c    文件BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer,false);
}
项目:big-c    文件DFSClient.java   
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,peer);
      IoUtils.closeSocket(sock);
    }
  }
}
项目:big-c    文件PeerCache.java   
private synchronized Peer getInternal(DatanodeID dnId,which is " + ageMs + " ms old");
      }
    } else if (!peer.isClosed()) {
      return peer;
    }
  }
  return null;
}
项目:big-c    文件dataxceiver.java   
private dataxceiver(Peer peer,colonIdx);
  localAddress = peer.getLocalAddressstring();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of active connections is: "
        + datanode.getXceiverCount());
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件RemoteBlockReader2.java   
protected RemoteBlockReader2(String file,0 );
  this.filename = file;
  this.peerCache = peerCache;

  // The total number of bytes that we need to transfer from the DN is
  // the amount that the user wants (bytesToRead),plus the padding at
  // the beginning in order to chunk-align. Note that the DN may elect
  // to send more than this amount if the read starts/ends mid-chunk.
  this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();
}
项目:hadoop-2.6.0-cdh5.4.3    文件RemoteBlockReader2.java   
static void checkSuccess(
    BlockOpResponseProto status,ExtendedBlock block,String file)
    throws IOException {
  if (status.getStatus() != Status.SUCCESS) {
    if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
      throw new InvalidBlockTokenException(
          "Got access token error for OP_READ_BLOCK,self="
              + peer.getLocalAddressstring() + ",remote="
              + peer.getRemoteAddressstring() + ",for file " + file
              + ",for pool " + block.getBlockPoolId() + " block " 
              + block.getBlockId() + "_" + block.getGenerationStamp());
    } else {
      throw new IOException("Got error for OP_READ_BLOCK,self="
          + peer.getLocalAddressstring() + ",remote="
          + peer.getRemoteAddressstring() + ",for file " + file
          + ",for pool " + block.getBlockPoolId() + " block " 
          + block.getBlockId() + "_" + block.getGenerationStamp());
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for specialized encrypted handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}
项目:hadoop-2.6.0-cdh5.4.3    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for general-purpose handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}
项目:hadoop-2.6.0-cdh5.4.3    文件BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer,false);
}
项目:hadoop-2.6.0-cdh5.4.3    文件DFSClient.java   
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,peer);
      IoUtils.closeSocket(sock);
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件PeerCache.java   
private synchronized Peer getInternal(DatanodeID dnId,which is " + ageMs + " ms old");
      }
    } else if (!peer.isClosed()) {
      return peer;
    }
  }
  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件dataxceiver.java   
private dataxceiver(Peer peer,colonIdx);
  localAddress = peer.getLocalAddressstring();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of active connections is: "
        + datanode.getXceiverCount());
  }
}
项目:hadoop-plus    文件DFSInputStream.java   
private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
  Peer peer = null;
  boolean success = false;
  Socket sock = null;
  try {
    sock = dfsClient.socketFactory.createSocket();
    NetUtils.connect(sock,dfsClient.getRandomLocalInterfaceAddr(),dfsClient.getConf().socketTimeout);
    peer = TcpPeerServer.peerFromSocketAndKey(sock,dfsClient.getDataEncryptionKey());
    success = true;
    return peer;
  } finally {
    if (!success) {
      IoUtils.closeQuietly(peer);
      IoUtils.closeQuietly(sock);
    }
  }
}
项目:hadoop-plus    文件RemoteBlockReader2.java   
protected RemoteBlockReader2(String file,plus the padding at
  // the beginning in order to chunk-align. Note that the DN may elect
  // to send more than this amount if the read starts/ends mid-chunk.
  this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();
}
项目:hadoop-plus    文件RemoteBlockReader2.java   
static void checkSuccess(
    BlockOpResponseProto status,for pool " + block.getBlockPoolId() + " block " 
          + block.getBlockId() + "_" + block.getGenerationStamp());
    }
  }
}
项目:hadoop-plus    文件PeerCache.java   
/**
 * Get a cached peer connected to the given Datanode.
 * @param dnId         The Datanode to get a Peer for.
 * @param isDomain     Whether to retrieve a DomainPeer or not.
 *
 * @return             An open Peer connected to the DN,or null if none
 *                     was found. 
 */
public synchronized Peer get(DatanodeID dnId,boolean isDomain) {

  if (capacity <= 0) { // disabled
    return null;
  }

  List<Value> sockStreamList = multimap.get(new Key(dnId,isDomain));
  if (sockStreamList == null) {
    return null;
  }

  Iterator<Value> iter = sockStreamList.iterator();
  while (iter.hasNext()) {
    Value candidate = iter.next();
    iter.remove();
    if (!candidate.getPeer().isClosed()) {
      return candidate.getPeer();
    }
  }
  return null;
}
项目:hadoop-plus    文件PeerCache.java   
/**
 * Give an unused socket to the cache.
 * @param sock socket not used by anyone.
 */
public synchronized void put(DatanodeID dnId,Peer peer) {
  Preconditions.checkNotNull(dnId);
  Preconditions.checkNotNull(peer);
  if (peer.isClosed()) return;
  if (capacity <= 0) {
    // Cache disabled.
    IoUtils.cleanup(LOG,peer);
    return;
  }

  startExpiryDaemon();

  if (capacity == multimap.size()) {
    evictOldest();
  }
  multimap.put(new Key(dnId,peer.getDomainSocket() != null),new Value(peer,Time.monotonicNow()));
}
项目:hadoop-plus    文件dataxceiver.java   
private dataxceiver(Peer peer,dataxceiverServer dataxceiverServer) throws IOException {

  this.peer = peer;
  this.dnConf = datanode.getDnConf();
  this.socketIn = peer.getInputStream();
  this.socketout = peer.getoutputStream();
  this.datanode = datanode;
  this.dataxceiverServer = dataxceiverServer;
  this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
  remoteAddress = peer.getRemoteAddressstring();
  localAddress = peer.getLocalAddressstring();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Number of active connections is: "
        + datanode.getXceiverCount());
  }
}
项目:FlexMap    文件RemoteBlockReader2.java   
protected RemoteBlockReader2(String file,plus the padding at
  // the beginning in order to chunk-align. Note that the DN may elect
  // to send more than this amount if the read starts/ends mid-chunk.
  this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();
}
项目:FlexMap    文件RemoteBlockReader2.java   
static void checkSuccess(
    BlockOpResponseProto status,for pool " + block.getBlockPoolId() + " block " 
          + block.getBlockId() + "_" + block.getGenerationStamp());
    }
  }
}
项目:FlexMap    文件SaslDataTransferServer.java   
/**
 * Receives SASL negotiation for specialized encrypted handshake.
 *
 * @param peer connection peer
 * @param underlyingOut connection output stream
 * @param underlyingIn connection input stream
 * @return new pair of streams,callbackHandler);
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。