微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

mapreduce的自定义类

自定义

1)需要实现一个接口:Writable
2)需要实现两个抽象方法
write() 序列化
ReadFields() 反序列化
注意:自定义类不能放在map输出的key的位置,其他位置都可以。

案例:

package com.lee.define;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * map端:
 * key:手机号
 * value:上行流量+下行流量
 * 	拼串   自定义类
 * reduce端: 
 * 	累加求和
 * */
public class Flow {
	static class myMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
			//获取每一行的内容
			String[] datas = value.toString().split("\t");
			if(datas.length==11) {
				Text tel = new Text(datas[1]);
				
				FlowBean fb=new FlowBean(Long.parseLong(datas[datas.length-3].trim()),
						Long.parseLong(datas[datas.length-2].trim()) );
				
				context.write(tel, fb);
			}
		};
	}
	static class myReduce extends Reducer<Text, FlowBean, Text, FlowBean>{
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context)
				throws IOException, InterruptedException {
			/*
			 * 手机号一样的分到一组中
			 * 遍历values求和
			 * */
			
			long upflow=0;
			long downflow=0;
			long sumflow=0;
			for (FlowBean flowBean : values) {
				upflow+=flowBean.getUpflow();
				downflow=flowBean.getUpflow();
			}
			//总上行流量、下行流量  求出
			FlowBean bean = new FlowBean(upflow, downflow);
			context.write(key, bean);
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		//加载配置文件
		Configuration conf = new Configuration();
		
		//启动一个Job
		Job job=Job.getInstance(conf);
		
		//主函数
		job.setJarByClass(Flow.class);
		
		//map reduce类
		job.setMapperClass(myMapper.class);
		job.setReducerClass(myReduce.class);
		
		//输出的key ,value
		job.setoutputKeyClass(Text.class);
		job.setoutputValueClass(FlowBean.class);
		
		//指定输入 
		FileInputFormat.addInputPath(job, new Path("E:\\flowin\\flow.log"));	
		//指定输出
		FileOutputFormat.setoutputPath(job, new Path("E:\\flowout"));
		
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion?0:1);
	}
}

jsperlee 发布了25 篇原创文章 · 获赞 0 · 访问量 637 私信 关注

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐