微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

教你怎么使用hadoop来提取文件中的指定内容

发现有很多小伙伴不会使用hadoop来提取文件中的指定内容,今天特地整理了这篇文章,文中有非常详细的代码示例,对不会这个方法的小伙伴们有很好地帮助,需要的朋友可以参考下

目录

一、需求

二、步骤

三、结果

一、需求

把以下txt中含“baidu”字符串的链接输出一个文件,否则输出到另外一个文件

二、步骤

1.LogMapper.java

package com.whj.mapreduce.outputformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 不做任何处理 context.write(value,NullWritable.get()); } }

2.LogReducer.java

package com.whj.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }

3.logoutputFormat.java

package com.whj.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class logoutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { LogRecordWriter lrw = new LogRecordWriter(job); return lrw; } }

4.LogRecordWriter.java

package com.whj.mapreduce.outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IoUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter { private FSDataOutputStream baiduOut;//ctrl+alt+f private FSDataOutputStream otherOut; public LogRecordWriter(TaskAttemptContext job) throws IOException { //创建两条流 FileSystem fs = FileSystem.get(job.getConfiguration()); baiduOut = fs.create(new Path("D:\temp\outputformat.log")); otherOut = fs.create(new Path("D:\temp\other.log")); } @Override public void write(Text key, NullWritable nullWritable) throws IOException, InterruptedException { // 具体写 String log = key.toString(); if(log.contains("baidu")){ baiduOut.writeBytes(log+"n"); }else{ otherOut.writeBytes(log+"n"); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //关流 IoUtils.closeStream(baiduOut); IoUtils.closeStream(otherOut); } }

5.LogDriver.java

package com.whj.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setoutputKeyClass(Text.class); job.setoutputValueClass(NullWritable.class); //设置自定义的 outputformat job.setoutputFormatClass(logoutputFormat.class); FileInputFormat.setInputPaths(job, new Path("D:\input")); // 虽 然 我 们 自 定 义 了 outputformat , 但 是 因 为 我 们 的 outputformat 继承自fileoutputformat //而 fileoutputformat 要输出一个_SUCCESS 文件,所以在这还得指定一个输出目录 FileOutputFormat.setoutputPath(job, new Path("D:\temp\logoutput")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

三、结果

到此这篇关于教你怎么使用hadoop来提取文件中的指定内容文章就介绍到这了,更多相关hadoop提取文件内容内容搜索编程之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程之家!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐