前面分享了使用mapreduce做wordcount单词统计的实现与原理。本篇博主将继续分享一个移动流量分析的经典案例,来帮助在实际工作中理解和使用hadoop平台。

        一、需求

            以下是一个移动流量的日志,我们需要根据日志分析出每个手机号对应的上行流量、下行流量、总流量。

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200

           

        二、实现

            字段说明:上面日志中,第二列为手机号;倒数第二三分别是下行流量和上行流量

hdfs dfs -mkdir -p /user/hadoop/flowcount

            FlowBean(分析输出结果bean)

package com.empire.hadoop.mr.flowcount;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 类 FlowBean.java的实现描述:流量bean实现,由于mapreduce需要在计算中将结果序列化进行传输,
 * 所以需要实现writable接口;如果需要进行排序需要实现WritableComparable接口
 *
 * @author arron 2018年11月24日 下午9:40:40
 */
public class FlowBean implements Writable {

    private long upFlow;
    private long dFlow;
    private long sumFlow;

    //反序列化时,需要反射调用空参构造函数,所以要显示定义一个
    public FlowBean() {
    }

    public FlowBean(long upFlow, long dFlow) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getdFlow() {
        return dFlow;
    }

    public void setdFlow(long dFlow) {
        this.dFlow = dFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    /**
     * 序列化方法
     */
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dFlow);
        out.writeLong(sumFlow);

    }

    /**
     * 反序列化方法 注意:反序列化的顺序跟序列化的顺序完全一致
     */
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {

        return upFlow + "\t" + dFlow + "\t" + sumFlow;
    }

}

          mapreduce主程序

package com.empire.hadoop.mr.flowcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 类 FlowCount.java的实现描述:移动日志分析某个手机号对应的上行总流量、下行总流量、总流量等信息
 *
 * @author arron 2018年11月24日 下午9:43:23
 */
public class FlowCount {

    static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            //将一行内容转成string
            String line = value.toString();
            //切分字段
            String[] fields = line.split("\t");
            //取出手机号
            String phoneNbr = fields[1];
            //取出上行流量下行流量
            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long dFlow = Long.parseLong(fields[fields.length - 2]);

            context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));

        }

    }

    static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

        //<183323,bean1><183323,bean2><183323,bean3><183323,bean4>.......
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context)
                throws IOException, InterruptedException {

            long sum_upFlow = 0;
            long sum_dFlow = 0;

            //遍历所有bean,将其中的上行流量,下行流量分别累加
            for (FlowBean bean : values) {
                sum_upFlow += bean.getUpFlow();
                sum_dFlow += bean.getdFlow();
            }

            FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
            context.write(key, resultBean);

        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        /*
         * conf.set("mapreduce.framework.name", "yarn");
         * conf.set("yarn.resoucemanager.hostname", "mini1");
         */
        Job job = Job.getInstance(conf);

        /* job.setJar("/home/hadoop/wc.jar"); */
        //指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowCount.class);

        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        /* job.submit(); */
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }

}

        三、打包运行

               打包按照上一篇博客wordcount的方式进行打包运行。运行效果如下:

18/11/25 06:03:38 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032
18/11/25 06:03:39 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/11/25 06:03:39 INFO input.FileInputFormat: Total input files to process : 5
18/11/25 06:03:39 INFO mapreduce.JobSubmitter: number of splits:5
18/11/25 06:03:40 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/11/25 06:03:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1543096217465_0001
18/11/25 06:03:41 INFO impl.YarnClientImpl: Submitted application application_1543096217465_0001
18/11/25 06:03:41 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1543096217465_0001/
18/11/25 06:03:41 INFO mapreduce.Job: Running job: job_1543096217465_0001
18/11/25 06:03:51 INFO mapreduce.Job: Job job_1543096217465_0001 running in uber mode : false
18/11/25 06:03:51 INFO mapreduce.Job:  map 0% reduce 0%
18/11/25 06:04:00 INFO mapreduce.Job:  map 20% reduce 0%
18/11/25 06:04:13 INFO mapreduce.Job:  map 100% reduce 0%
18/11/25 06:04:14 INFO mapreduce.Job:  map 100% reduce 100%
18/11/25 06:04:15 INFO mapreduce.Job: Job job_1543096217465_0001 completed successfully
18/11/25 06:04:15 INFO mapreduce.Job: Counters: 50
        File System Counters
                FILE: Number of bytes read=4171
                FILE: Number of bytes written=1193767
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=11574
                HDFS: Number of bytes written=594
                HDFS: Number of read operations=18
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Killed map tasks=1
                Launched map tasks=5
                Launched reduce tasks=1
                Data-local map tasks=5
                Total time spent by all maps in occupied slots (ms)=79442
                Total time spent by all reduces in occupied slots (ms)=11115
                Total time spent by all map tasks (ms)=79442
                Total time spent by all reduce tasks (ms)=11115
                Total vcore-milliseconds taken by all map tasks=79442
                Total vcore-milliseconds taken by all reduce tasks=11115
                Total megabyte-milliseconds taken by all map tasks=81348608
                Total megabyte-milliseconds taken by all reduce tasks=11381760
        Map-Reduce Framework
                Map input records=110
                Map output records=110
                Map output bytes=3945
                Map output materialized bytes=4195
                Input split bytes=624
                Combine input records=0
                Combine output records=0
                Reduce input groups=21
                Reduce shuffle bytes=4195
                Reduce input records=110
                Reduce output records=21
                Spilled Records=220
                Shuffled Maps =5
                Failed Shuffles=0
                Merged Map outputs=5
                GC time elapsed (ms)=1587
                CPU time spent (ms)=4710
                Physical memory (bytes) snapshot=878612480
                Virtual memory (bytes) snapshot=5069615104
                Total committed heap usage (bytes)=623616000
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=10950
        File Output Format Counters
                Bytes Written=594

                分析结果:

[hadoop@centos-aaron-h1 ~]$ hadoop fs -ls /user/hadoop/flowcountount
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2018-11-25 06:04 /user/hadoop/flowcountount/_SUCCESS
-rw-r--r--   2 hadoop supergroup        594 2018-11-25 06:04 /user/hadoop/flowcountount/part-r-00000
[hadoop@centos-aaron-h1 ~]$ hadoop fs -cat /user/hadoop/flowcountount/part-r-00000
13480253104     900     900     1800
13502468823     36675   551745  588420
13560436666     5580    4770    10350
13560439658     10170   29460   39630
13602846565     9690    14550   24240
13660577991     34800   3450    38250
13719199419     1200    0       1200
13726230503     12405   123405  135810
13726238888     12405   123405  135810
13760778710     600     600     1200
13826544101     1320    0       1320
13922314466     15040   18600   33640
13925057413     55290   241215  296505
13926251106     1200    0       1200
13926435656     660     7560    8220
15013685858     18295   17690   35985
15920133257     15780   14680   30460
15989002119     9690    900     10590
18211575961     7635    10530   18165
18320173382     47655   12060   59715
84138413        20580   7160    27740

             最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

11-26 23:31