微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Hadoop--MapReduce流量计算

文章目录

一. 使用的设备和搭建的环境

  1. 三台centos7 虚拟机
  2. Xshell连接虚拟机
  3. IDEA_2018
  4. hdfs集群
  5. yarn集群
  6. Chrome, FireFox浏览器
  7. 网易云

start-dfs.sh 启动hdfs集群

start-yarn.sh 启动yarn集群

当前的环境配置

在这里插入图片描述

二. 统计各个手机号的上传和下载流量总和

统计的数据

每行数据的第二列数据是手机号,倒数第三列表示上行流量,倒数第二列表示下行流量

1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	    0	    200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	    1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	    0	    200
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	    0	    200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	    1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	    0	    200

最终要求的结果

13726230503	 上传流量:4962  下载流量:49362  总数据流量:  54324
13826544101	 上传流量:528  下载流量:0  总数据流量:  528
13926251106	 上传流量:480  下载流量:0  总数据流量:  480
13926435656	 上传流量:264  下载流量:3024  总数据流量:  3288

1. 创建数据文件上传到HDFS文件系统中

vim access.log

在这里插入图片描述

创建目录 hdfs dfs -mkdir -p /accesslog

上传文件到目录 hdfs dfs -put access.log /accesslog

在这里插入图片描述

2. 编写mapreduce的job作业完成统计

pom.xml 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <hadoop.version>2.9.2</hadoop.version>
        <!--<packaging>jar</packaging>-->
    </properties>
    <groupId>com.xizi</groupId>
    <artifactId>AccessLogJob</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--hadoop公共依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--hadoop client 依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--map reduce-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- 在打包插件中指定main class 信息 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <outputDirectory>${basedir}/target</outputDirectory>
                    <archive>
                        <manifest>
                            <mainClass>com.xizi.accessLogJob.AccessLogJob</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>


            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>wagon-maven-plugin</artifactId>
                <version>1.0</version>
                <configuration>
                    <fromFile>target/${project.build.finalName}.jar</fromFile>
                    <url>scp://root:123456@自己虚拟机的ip/root</url>
                    <commands>
                        <!-- 通过sh 执行shell脚本文件 -->
                        <command>nohup hadoop-2.9.2/bin/hadoop jar ${project.build.finalName}.jar > /root/mapreduce.out 2>&amp;1 &amp; </command>
                    </commands>
                    <displayCommandOutputs>true</displayCommandOutputs>
                </configuration>
            </plugin>

        </plugins>

        <!--扩展maven的插件中加入ssh插件-->
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.8</version>
            </extension>
        </extensions>
    </build>

</project>

Job 工作代码

package com.xizi.accessLogJob;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.textoutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import java.io.IOException;

//统计手机流量
public class AccessLogJob extends Configured implements Tool {

    private static Logger logger = Logger.getLogger(AccessLogJob.class);

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new AccessLogJob(),args);
    }

    @Override
    public int run(String[] strings) throws Exception {
        //创建job作业
        Job job = Job.getInstance(getConf(), "access-log");
        job.setJarByClass(AccessLogJob.class);

        //设置InputFormate
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //设置map
        job.setMapperClass(AccessLogMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //shuffle  无须设置 自动完成

        //设置reduce
        job.setReducerClass(AccessLogReduce.class);
        job.setoutputKeyClass(Text.class);
        job.setoutputValueClass(Text.class);

        //设置Output Format
        job.setoutputFormatClass(textoutputFormat.class);
        Path res = new Path("/accesslog/res");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)) {
            fileSystem.delete(res,true);
        }
        textoutputFormat.setoutputPath(job, res);

        //提交job作业
        boolean status = job.waitForCompletion(true);
        System.out.println("本次作业执行状态 = " + status);

        return 0;
    }



    public static class AccessLogMap extends Mapper<LongWritable, Text,Text,Text>{

        @Override //参数1:行首字母偏移量  参数2:当前row数据 参数3:map输出上下文
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            //输出key 为手机号  值为: 每个手机号"上传-下载流量"格式文本
            context.write(new Text(values[1]),new Text(values[values.length-3]+"-"+values[values.length-2]));

            logger.info("手机号: "+values[1]+"  流量格式:"+values[values.length-3]+"-"+values[values.length-2]);
        }
    }
    //reduce
    public static class AccessLogReduce extends Reducer<Text,Text,Text,Text>{
        @Override //参数1:map的key  参数2:相当key的数组   参数3:Reduce输出的上下文
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int uploadData = 0; //保存上传流量
            int downData = 0;   //保存下载流量
            for (Text value : values) {
                String[] datas = value.toString().split("-");
                uploadData+= Integer.valueOf(datas[0]);
                downData+= Integer.valueOf(datas[1]);
            }
            int total = uploadData + downData;//保存总流量

            //输出
            context.write(key,new Text(" 上传流量:"+uploadData+"  下载流量:"+downData+"  总数据流量:  "+total));
            logger.info("手机号: "+key+" 上传流量:"+uploadData+"  下载流量:"+downData+"  总数据流量:  "+total);
        }
    }

}

3. 打包jar上传到服务器

maven 打包jar 上传到虚拟机上

利用wagon 插件 自动打包上传到服务器上

在这里插入图片描述

执行jar命令 yarn jar AccessLogJob-1.0-SNAPSHOT.jar

在这里插入图片描述

也可以利用wagon的插件进行自动上传和执行

在这里插入图片描述

4. 查看执行结果

hdfs dfs -text /accesslog/res/part-r-00000

在这里插入图片描述

统计结果

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐