MapReduce编程练习3.0

MapReduce编程练习3.0MapReduce编程练习3.03.0需求:这次统计,也不排序,需要把数据按照手机归属地不同省份输出到不同文件中问题点:需要输出到不同的文件中需要按手机归属地进行分组解析:修改reducer的数量即可输出多个文件对于分组,Hadoop默认使用的是key的hash值对reducer个数取余分组,这里实现是通过调用默认的分组类:HashPartitioner进行分区的。…

大家好,欢迎来到IT知识分享网。

MapReduce编程练习3.0

3.0需求:这次统计,也不排序,需要把数据按照手机归属地不同省份输出到不同文件中

问题点:

  1. 需要输出到不同的文件中
  2. 需要按手机归属地进行分组

解析:

  1. 修改reducer的数量即可输出多个文件

  2. 对于分组,Hadoop默认使用的是key的hash值对reducer个数取余分组,这里实现是通过调用默认的分组类:HashPartitioner进行分区的。

  3. HashPartitioner类继承了Partitioner类,然后通过里面的getPartition方法进行分组。我们编写一个类,继承Partitioner类,改写里面的getPartition方法以实现需求。

    [外链图片转存失败(img-bj77KHL8-1568619283030)(D:\学习笔记\hadoop\保存图片\MapReduce编程练习\默认的hash分组.jpg)]

3.1新需求解决编程

1. 先编写一个分组类

package com.chinasofti.mapreducepractice.flowsum.partition;

import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/** * 分组类,继承Partitioner类 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { 
   
    public static HashMap<String, Integer>  provinceMap = new HashMap<String, Integer>();

    // 定义分组方式,将对应的值放在一个HashMap中
    static{ 
   
        provinceMap.put("134", 0);
        provinceMap.put("135", 1);
        provinceMap.put("136", 2);
        provinceMap.put("137", 3);
        provinceMap.put("138", 4);
    }

    //重写分组方法,通过
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) { 
   
        // 这里是截取手机号的前三位数字来作为不同的省份
        Integer code = provinceMap.get(key.toString().substring(0, 3));

        if (code != null) { 
   
            return code;
        }
        return 5;
    }
}

2. Bean、map和reduce类不用改,直接使用第一次需求里的。

3. 将分组方式设置进driver里面来,使之生效。同时修改输出的文件数量,即设置reducer数量为6.

package com.chinasofti.mapreducepractice.flowsum.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowSumDriver { 
   
    public static void main(String[] args) throws Exception { 
   
        //通过Job来封装本次mr的相关信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指定本次mr job jar包运行主类
        job.setJarByClass(FlowSumDriver.class);

        //指定本次mr 所用的mapper reducer类分别是什么
        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);

        //指定本次mr mapper阶段的输出 k v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //指定本次mr 最终输出的 k v类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置reduce数量为6
        // getPartition 返回的分区个数 = NumReduceTasks 正常执行
        // getPartition 返回的分区个数 > NumReduceTasks 报错:Illegal partition
        // getPartition 返回的分区个数 < NumReduceTasks 可以执行 ,多出空白文件
        job.setNumReduceTasks(6);

        // 将分组设置添加进来
        //这里指定使用我们自定义的分区组件
        job.setPartitionerClass(ProvincePartitioner.class);

        /* * 本地运行语句 * 注意本地运行时输出目录也不能存在,否则也会报错 */
        FileInputFormat.setInputPaths(job,new Path("D:\\Practice_File\\hadoop_practice\\MapReduce\\flowsum\\input"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\Practice_File\\hadoop_practice\\MapReduce\\flowsum\\outputProvince"));
// job.submit();
        //提交程序 并且监控打印程序执行情况
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

[外链图片转存失败(img-i4yBaGLR-1568619283031)(D:\学习笔记\hadoop\保存图片\MapReduce编程练习\按省份分组结果.jpg)]

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/15602.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信