如何解决在hadoop中执行另一项工作
| 我不明白如何使工作使用相同的输出目录 目录以在其中写入其他文件。我尝试过通勤 并注释这条线,但仍然无法正常工作。我得到以下 当我评论它时例外。无论如何,我在代码中尝试运行两个 使用相同的reducer但使用不同的mapper的单独作业。 编辑:不,一个工作的输出不是另一个工作的输入,原因 我希望它们在同一文件夹中是因为它们是另一张地图的输入 减少我想做的工作。FileOutputFormat.setOutputPath(job,new Path(args[1]));
11/04/14 13:33:11 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread \"main\" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:770)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
at org.myorg.WordCount.main(WordCount.java:123)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// if (otherArgs.length != 2) {
// System.err.println(\"Usage: wordcount <in> <out>\");
// System.exit(2);
// }
Job job = new Job(conf,\"Job1\");
job.setJarByClass(WordCount.class);
job.setMapperClass(Mapper1.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
FileSystem hdfs = FileSystem.get(conf);
Path fromPath = new Path(\"/user/hadoop/output/part-r-00000\");
Path toPath = new Path(\"/user/hadoop/output/output1\");
// renaming to output1
boolean isRenamed = hdfs.rename(fromPath,toPath);
if (isRenamed)
{
System.out.println(\"Renamed to /user/hadoop/output/output1!\");
}
else
{
System.out.println(\"Not Renamed!\");
}
job = new Job(conf,\"Job2\");
job.setJarByClass(WordCount.class);
job.setMapperClass(Mapper2.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// FileInputFormat.addInputPath(job,new Path(args[0]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit( job.waitForCompletion(true) ? 0 : 1);
}
将以下内容添加到我的代码中会导致其他错误:
job.setInputFormatClass(FileInputFormat.class);
job.setOutputFormatClass(FileOutputFormat.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
Exception in thread \"main\" java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:768)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
at org.myorg.WordCount.main(WordCount.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
... 9 more
解决方法
所有文件都不必位于同一目录中。您的第三项作业可以具有多个输入路径(目录或文件)。
看到
FileInputFormat.addInputPaths(JobConf conf,String commaSeparatedPaths)
和朋友...
,您必须为第二个作业提供一个新的Configuration对象。顺便说一句,为什么不对输出格式使用这些方法?
job.setInputFormatClass(FileInputFormat.class);
job.setOutputFormatClass(FileOutputFormat.class);
这是一篇关于递归工作的博客文章,这与您正在做的事情完全相同。
http://codingwiththomas.blogspot.com/2011/04/controlling-hadoop-job-recursion.html
编辑:
顺便说一句,您打算将一个文件夹写入到哪个文件夹中,该文件夹既是前一个作业的输出又是新作业的输入?这只会导致另一个异常,例如:\“输出路径已存在\”。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。