文章目录
一. 使用的设备和搭建的环境
- 三台centos7 虚拟机
- Xshell连接虚拟机
- IDEA_2018
- hdfs集群
- yarn集群
- Chrome, FireFox浏览器
- 网易云
start-dfs.sh 启动hdfs集群
start-yarn.sh 启动yarn集群
当前的环境配置
二. 统计各个手机号的上传和下载流量总和
统计的数据
每行数据的第二列数据是手机号,倒数第三列表示上行流量,倒数第二列表示下行流量
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
最终要求的结果
13726230503 上传流量:4962 下载流量:49362 总数据流量: 54324
13826544101 上传流量:528 下载流量:0 总数据流量: 528
13926251106 上传流量:480 下载流量:0 总数据流量: 480
13926435656 上传流量:264 下载流量:3024 总数据流量: 3288
1. 创建数据文件上传到HDFS文件系统中
vim access.log
创建目录 hdfs dfs -mkdir -p /accesslog
2. 编写mapreduce的job作业完成统计
pom.xml 导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hadoop.version>2.9.2</hadoop.version>
<!--<packaging>jar</packaging>-->
</properties>
<groupId>com.xizi</groupId>
<artifactId>AccessLogJob</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--hadoop公共依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hadoop client 依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--map reduce-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 在打包插件中指定main class 信息 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<outputDirectory>${basedir}/target</outputDirectory>
<archive>
<manifest>
<mainClass>com.xizi.accessLogJob.AccessLogJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<fromFile>target/${project.build.finalName}.jar</fromFile>
<url>scp://root:123456@自己虚拟机的ip/root</url>
<commands>
<!-- 通过sh 执行shell脚本文件 -->
<command>nohup hadoop-2.9.2/bin/hadoop jar ${project.build.finalName}.jar > /root/mapreduce.out 2>&1 & </command>
</commands>
<displayCommandOutputs>true</displayCommandOutputs>
</configuration>
</plugin>
</plugins>
<!--扩展maven的插件中加入ssh插件-->
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
</build>
</project>
Job 工作代码
package com.xizi.accessLogJob;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.textoutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import java.io.IOException;
//统计手机流量
public class AccessLogJob extends Configured implements Tool {
private static Logger logger = Logger.getLogger(AccessLogJob.class);
public static void main(String[] args) throws Exception {
ToolRunner.run(new AccessLogJob(),args);
}
@Override
public int run(String[] strings) throws Exception {
//创建job作业
Job job = Job.getInstance(getConf(), "access-log");
job.setJarByClass(AccessLogJob.class);
//设置InputFormate
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//设置map
job.setMapperClass(AccessLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//shuffle 无须设置 自动完成
//设置reduce
job.setReducerClass(AccessLogReduce.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(Text.class);
//设置Output Format
job.setoutputFormatClass(textoutputFormat.class);
Path res = new Path("/accesslog/res");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)) {
fileSystem.delete(res,true);
}
textoutputFormat.setoutputPath(job, res);
//提交job作业
boolean status = job.waitForCompletion(true);
System.out.println("本次作业执行状态 = " + status);
return 0;
}
public static class AccessLogMap extends Mapper<LongWritable, Text,Text,Text>{
@Override //参数1:行首字母偏移量 参数2:当前row数据 参数3:map输出上下文
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
//输出key 为手机号 值为: 每个手机号"上传-下载流量"格式文本
context.write(new Text(values[1]),new Text(values[values.length-3]+"-"+values[values.length-2]));
logger.info("手机号: "+values[1]+" 流量格式:"+values[values.length-3]+"-"+values[values.length-2]);
}
}
//reduce
public static class AccessLogReduce extends Reducer<Text,Text,Text,Text>{
@Override //参数1:map的key 参数2:相当key的数组 参数3:Reduce输出的上下文
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int uploadData = 0; //保存上传流量
int downData = 0; //保存下载流量
for (Text value : values) {
String[] datas = value.toString().split("-");
uploadData+= Integer.valueOf(datas[0]);
downData+= Integer.valueOf(datas[1]);
}
int total = uploadData + downData;//保存总流量
//输出
context.write(key,new Text(" 上传流量:"+uploadData+" 下载流量:"+downData+" 总数据流量: "+total));
logger.info("手机号: "+key+" 上传流量:"+uploadData+" 下载流量:"+downData+" 总数据流量: "+total);
}
}
}
3. 打包jar上传到服务器
maven 打包jar 上传到虚拟机上
执行jar命令 yarn jar AccessLogJob-1.0-SNAPSHOT.jar
4. 查看执行结果
hdfs dfs -text /accesslog/res/part-r-00000
统计结果
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。