小编给大家分享一下hadoop中wordcount 、wordmean的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
注意
hadoop src 可以在 hadoop官网上下载。
分析中/**开头的为源码自带,//开头的为作者心得。
wordcount:
package hadoop1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericoptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //split into words by blank just like "//s" in Regular Expression StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoretokens()) { word.set(itr.nextToken()); //format :<word,1>, context.write() is the format in map&&reduce to output context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; //foreach value(a kind of word), //do sum : <word1,1>,......,<word1,1>------><word1,n>, <word2,1>,......,<word2,1>------><word2,n> for (IntWritable val : values) { sum += val.get(); } result.set(sum); //format <word1,sum1>,<word2,sum2>...... context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //get in/out path, format:otherArgs ={input1,input2......,output} String[] otherArgs = new GenericoptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } //singleton Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //combiner is just like a reducer on single node to reduce the net pressure, //not all the task suit for combiner. //so <key,1>,......,<key,1>------><key,n> job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); //mapper's output job.setoutputKeyClass(Text.class); job.setoutputValueClass(IntWritable.class); //add new path for multinput (input1,input2...) for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setoutputPath(job, new Path(otherArgs[otherArgs.length - 1])); //0 for normal exit,else not normal System.exit(job.waitForCompletion(true) ? 0 : 1); } }
wordmean:
package hadoop2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Charsets; public class WordMean extends Configured implements Tool { private double mean = 0; private final static Text COUNT = new Text("count"); private final static Text LENGTH = new Text("length"); private final static LongWritable ONE = new LongWritable(1); /** * Maps words from line of text into 2 key-value pairs; one key-value pair for * counting the word, another for counting its length. */ public static class WordMeanMapper extends Mapper<Object, Text, Text, LongWritable> { private LongWritable wordLen = new LongWritable(); /** * Emits 2 key-value pairs for counting the word and its length. Outputs are * (Text, LongWritable). * * @param value * This will be a line of text coming in from our input file. */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoretokens()) { String string = itr.nextToken(); //map into format:word1,word2......,wordn. //for each token(word) split by blank, //set two kinds of<key,value>------><"count",1>and<"length",string.length> this.wordLen.set(string.length()); context.write(LENGTH, this.wordLen); context.write(COUNT, ONE); } } } /** * Performs integer summation of all the values for each key. */ public static class WordMeanReducer extends Reducer<Text, LongWritable, Text, LongWritable> { // LongWritable is just like Long in java, //to implement hadoop's own type is for Serialization and Anti serialization private LongWritable sum = new LongWritable(); /** * Sums all the individual values within the iterator and writes them to the * same key. * * @param key * This will be one of 2 constants: LENGTH_STR or COUNT_STR. * @param values * This will be an iterator of all the values associated with that * key. */ public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //for two constants:"count" and "length", //calculate the sum for each constant. :<count,1>,.....,<count,1> -------> <count,n>&&<length,3>,......,<length,4>-----><length,m> int theSum = 0; for (LongWritable val : values) { theSum += val.get(); } sum.set(theSum); context.write(key, sum); } } /** * Reads the output file and parses the summation of lengths, and the word * count, to perform a quick calculation of the mean. * * @param path * The path to find the output file in. Set in main to the output * directory. * @throws IOException * If it cannot access the output directory, we throw an exception. */ private double readAndCalcmean(Path path, Configuration conf) throws IOException { //read from reducers' output FileSystem fs = FileSystem.get(conf); Path file = new Path(path, "part-r-00000"); if (!fs.exists(file)) throw new IOException("Output not found!"); BufferedReader br = null; // average = total sum(m in reduce) / number of elements(n in reduce); try { //BufferedReader is a Decorator to InputStreamReader, /for add the method .readLine() and so on. br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8)); long count = 0; long length = 0; String line; while ((line = br.readLine()) != null) { StringTokenizer st = new StringTokenizer(line); // grab type ------to spilt "count" and "length" String type = st.nextToken(); // differentiate if (type.equals(COUNT.toString())) { String countLit = st.nextToken(); count = Long.parseLong(countLit); System.out.println("The count is: " + count );//~ add by author :output total word count n } else if (type.equals(LENGTH.toString())) { String lengthLit = st.nextToken(); length = Long.parseLong(lengthLit); System.out.println("The length is: " + length );//~ add by author :output total word length m } } double theMean = (((double) length) / ((double) count)); System.out.println("The mean is: " + theMean); return theMean; } finally { if (br != null) { br.close(); } } } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordMean(), args); } @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: wordmean <in> <out>"); return 0; } Configuration conf = getConf(); Job job = Job.getInstance(conf, "word mean"); job.setJarByClass(WordMean.class); job.setMapperClass(WordMeanMapper.class); job.setCombinerClass(WordMeanReducer.class); job.setReducerClass(WordMeanReducer.class); job.setoutputKeyClass(Text.class); job.setoutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path outputpath = new Path(args[1]); FileOutputFormat.setoutputPath(job, outputpath); boolean result = job.waitForCompletion(true); mean = readAndCalcmean(outputpath, conf); return (result ? 0 : 1); } /** * Only valuable after run() called. * * @return Returns the mean value. */ public double getMean() { return mean; } }
看完了这篇文章,相信你对“hadoop中wordcount 、wordmean的示例代码”有了一定的了解,如果想了解更多相关知识,欢迎关注编程之家行业资讯频道,感谢各位的阅读!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。