项目:hadoop-EAR
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
int nsId = cluster.getNameNode().getNamespaceID();
Block[][] blocks = cluster.getAllBlockReports(nsId);
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(nsId,b));
}
}
return files;
}
项目:hadoop-on-lustre
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
Block[][] blocks = cluster.getAllBlockReports();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(b));
}
}
return files;
}
项目:cumulus
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
Iterable<Block>[] blocks = cluster.getAllBlockReports();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(b));
}
}
return files;
}
项目:RDFS
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
int nsId = cluster.getNameNode().getNamespaceID();
Block[][] blocks = cluster.getAllBlockReports(nsId);
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(nsId,b));
}
}
return files;
}
项目:hadoop-0.20
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
Block[][] blocks = cluster.getAllBlockReports();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(b));
}
}
return files;
}
项目:hortonworks-extension
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
Block[][] blocks = cluster.getAllBlockReports();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(b));
}
}
return files;
}
项目:hadoop-gpu
文件:TestDFSShell.java
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<Datanode> datanodes = cluster.getDatanodes();
Block[][] blocks = cluster.getAllBlockReports();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
files.add(ds.getBlockFile(b));
}
}
return files;
}
public void testParallelCheckDirs() throws Exception {
final Datanode datanode = cluster.getDatanodes().get(0);
FSDataset fsDataset = (FSDataset) datanode.data;
datanode.data = spy(fsDataset);
final Method checkdiskMethod = Datanode.class.getDeclaredMethod(
"checkdiskError",Exception.class);
checkdiskMethod.setAccessible(true);
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Todo Auto-generated catch block
e.printstacktrace();
}
return null;
}
}).when(datanode.data).checkDataDir();
Thread[] threads = new Thread[30];
for (int i = 0; i < 30; i++) {
threads[i] = new Thread() {
public void run() {
try {
checkdiskMethod.invoke(datanode,new Exception("Fake Exception"));
} catch (IllegalArgumentException e) {
TestCase.fail("IllegalArgumentException");
} catch (illegalaccessexception e) {
TestCase.fail("illegalaccessexception");
} catch (InvocationTargetException e) {
TestCase.fail("InvocationTargetException");
}
}
};
}
// Parallel 10 checks should only have one launched.
for (int i = 0; i < 10; i++) {
threads[i].start();
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
verify(datanode.data,times(1)).checkDataDir();
// Next checks won't be launched as one recently finishes.
for (int i = 10; i < 20; i++) {
threads[i].start();
}
for (int i = 10; i < 20; i++) {
threads[i].join();
}
verify(datanode.data,times(1)).checkDataDir();
// After 2 seconds,another check should be able to run
Thread.sleep(2000);
for (int i = 20; i < 30; i++) {
threads[i].start();
}
for (int i = 20; i < 30; i++) {
threads[i].join();
}
verify(datanode.data,times(2)).checkDataDir();
}
项目:hadoop-on-lustre
文件:BlockReaderLocal.java
/**
* The only way this object can be instantiated.
*/
static BlockReaderLocal newBlockReader(Configuration conf,String file,Block blk,Token<BlockTokenIdentifier> token,DatanodeInfo node,int socketTimeout,long startOffset,long length) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk,node,conf,socketTimeout,token);
}
// check to see if the file exists. It may so happen that the
// HDFS file has been deleted and this block-lookup is occurring
// on behalf of a new HDFS file. This time,the block file Could
// be residing in a different portion of the fs.data.dir directory.
// In this case,we remove this entry from the cache. The next
// call to this method will re-populate the cache.
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocal localBlockReader = null;
boolean skipChecksum = shortCircuitChecksum(conf);
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);
if (LOG.isDebugEnabled()) {
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ blkfile.length() + " startOffset " + startOffset + " length "
+ length + " short circuit checksum " + skipChecksum);
}
if (!skipChecksum) {
// get the Metadata file
File Metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(Metafile);
// read and handle the common header here. For Now just a version
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != FSDataset.MetaDATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for Metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
localBlockReader = new BlockReaderLocal(conf,file,blk,token,startOffset,length,pathinfo,checksum,true,dataIn,checksumIn);
} else {
localBlockReader = new BlockReaderLocal(conf,dataIn);
}
} catch (IOException e) {
// remove from cache
localDatanodeInfo.removeBlockLocalPathInfo(blk);
DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
" from cache because local file " + pathinfo.getBlockPath() +
" Could not be opened.");
throw e;
} finally {
if (localBlockReader == null) {
if (dataIn != null) {
dataIn.close();
}
if (checksumIn != null) {
checksumIn.close();
}
}
}
return localBlockReader;
}
public void testParallelCheckDirs() throws Exception {
final Datanode datanode = cluster.getDatanodes().get(0);
FSDataset fsDataset = (FSDataset) datanode.data;
datanode.data = spy(fsDataset);
final Method checkdiskMethod = Datanode.class.getDeclaredMethod(
"checkdiskError",times(2)).checkDataDir();
}
项目:RDFS
文件:BlockReaderLocal.java
/**
* The only way this object can be instantiated.
*/
public static BlockReaderLocal newBlockReader(Configuration conf,int namespaceid,long length,DFSClientMetrics metrics,boolean verifyChecksum,boolean clearOsBuffer) throws IOException {
// check in cache first
BlockPathInfo pathinfo = cache.get(blk);
if (pathinfo == null) {
// cache the connection to the local data for eternity.
if (datanode == null) {
datanode = DFSClient.createClientDNProtocolProxy(node,0);
}
// make RPC to local datanode to find local pathnames of blocks
if (datanode.isMethodSupported("getBlockPathInfo",int.class,Block.class)) {
pathinfo = datanode.getProxy().getBlockPathInfo(namespaceid,blk);
} else {
pathinfo = datanode.getProxy().getBlockPathInfo(blk);
}
if (pathinfo != null) {
cache.put(blk,pathinfo);
}
}
// check to see if the file exists. It may so happen that the
// HDFS file has been deleted and this block-lookup is occuring
// on behalf of a new HDFS file. This time,we remove this entry from the cache. The next
// call to this method will repopulate the cache.
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
FileInputStream dataIn = new FileInputStream(blkfile);
if (LOG.isDebugEnabled()) {
LOG.debug("New BlockReaderLocal for file " +
blkfile + " of size " + blkfile.length() +
" startOffset " + startOffset +
" length " + length);
}
if (verifyChecksum) {
// get the Metadata file
File Metafile = new File(pathinfo.getMetaPath());
FileInputStream checksumIn = new FileInputStream(Metafile);
// read and handle the common header here. For Now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn),new PureJavaCrc32());
short version = header.getVersion();
if (version != FSDataset.MetaDATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for Metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
return new BlockReaderLocal(conf,metrics,verifyChecksum,checksumIn,clearOsBuffer);
}
else {
return new BlockReaderLocal(conf,clearOsBuffer);
}
} catch (FileNotFoundException e) {
cache.remove(blk); // remove from cache
DFSClient.LOG.warn("BlockReaderLoca: Removing " + blk +
" from cache because local file " +
pathinfo.getBlockPath() +
" Could not be opened.");
throw e;
}
}
项目:hortonworks-extension
文件:BlockReaderLocal.java
/**
* The only way this object can be instantiated.
*/
static BlockReaderLocal newBlockReader(Configuration conf,dataIn);
}
} catch (IOException e) {
// remove from cache
localDatanodeInfo.removeBlockLocalPathInfo(blk);
DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
" from cache because local file " + pathinfo.getBlockPath() +
" Could not be opened.");
throw e;
} finally {
if (localBlockReader == null) {
if (dataIn != null) {
dataIn.close();
}
if (checksumIn != null) {
checksumIn.close();
}
}
}
return localBlockReader;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。