大家好,欢迎来到IT知识分享网。
MapReduce编程练习3.0
3.0需求:这次统计,也不排序,需要把数据按照手机归属地不同省份输出到不同文件中
问题点:
- 需要输出到不同的文件中
- 需要按手机归属地进行分组
解析:
-
修改reducer的数量即可输出多个文件
-
对于分组,Hadoop默认使用的是key的hash值对reducer个数取余分组,这里实现是通过调用默认的分组类:HashPartitioner进行分区的。
-
HashPartitioner类继承了Partitioner类,然后通过里面的getPartition方法进行分组。我们编写一个类,继承Partitioner类,改写里面的getPartition方法以实现需求。
3.1新需求解决编程
1. 先编写一个分组类
package com.chinasofti.mapreducepractice.flowsum.partition;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/** * 分组类,继承Partitioner类 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
public static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
// 定义分组方式,将对应的值放在一个HashMap中
static{
provinceMap.put("134", 0);
provinceMap.put("135", 1);
provinceMap.put("136", 2);
provinceMap.put("137", 3);
provinceMap.put("138", 4);
}
//重写分组方法,通过
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 这里是截取手机号的前三位数字来作为不同的省份
Integer code = provinceMap.get(key.toString().substring(0, 3));
if (code != null) {
return code;
}
return 5;
}
}
2. Bean、map和reduce类不用改,直接使用第一次需求里的。
3. 将分组方式设置进driver里面来,使之生效。同时修改输出的文件数量,即设置reducer数量为6.
package com.chinasofti.mapreducepractice.flowsum.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowSumDriver {
public static void main(String[] args) throws Exception {
//通过Job来封装本次mr的相关信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本次mr job jar包运行主类
job.setJarByClass(FlowSumDriver.class);
//指定本次mr 所用的mapper reducer类分别是什么
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
//指定本次mr mapper阶段的输出 k v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//指定本次mr 最终输出的 k v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置reduce数量为6
// getPartition 返回的分区个数 = NumReduceTasks 正常执行
// getPartition 返回的分区个数 > NumReduceTasks 报错:Illegal partition
// getPartition 返回的分区个数 < NumReduceTasks 可以执行 ,多出空白文件
job.setNumReduceTasks(6);
// 将分组设置添加进来
//这里指定使用我们自定义的分区组件
job.setPartitionerClass(ProvincePartitioner.class);
/* * 本地运行语句 * 注意本地运行时输出目录也不能存在,否则也会报错 */
FileInputFormat.setInputPaths(job,new Path("D:\\Practice_File\\hadoop_practice\\MapReduce\\flowsum\\input"));
FileOutputFormat.setOutputPath(job,new Path("D:\\Practice_File\\hadoop_practice\\MapReduce\\flowsum\\outputProvince"));
// job.submit();
//提交程序 并且监控打印程序执行情况
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/15602.html