本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2024-11(1)

2021年大数据Hadoop(十九):MapReduce分区

发布于2021-05-30 20:56     阅读(912)     评论(0)     点赞(27)     收藏(2)


全网最详细的Hadoop文章系列,强烈建议收藏加关注!

后面更新文章都会列出历史文章目录,帮助大家回顾知识重点。

目录

本系列历史文章

前言

MapReduce分区

​​​​​​​分区概述

​​​​​​​分区步骤

1、定义 Mapper

2、自定义Partitioner

3、定义 Reducer 逻辑

4、主类中设置分区类和ReduceTask个数


本系列历史文章

2021年大数据Hadoop(十八):MapReduce程序运行模式和深入解析

2021年大数据Hadoop(十七):MapReduce编程规范及示例编写

2021年大数据Hadoop(十六):MapReduce计算模型介绍

2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation

2021年大数据Hadoop(十四):HDFS的高可用机制

2021年大数据Hadoop(十三):HDFS意想不到的其他功能

2021年大数据Hadoop(十二):HDFS的API操作

2021年大数据Hadoop(十一):HDFS的元数据辅助管理

2021年大数据Hadoop(十):HDFS的数据读写流程

2021年大数据Hadoop(九):HDFS的高级使用命令

2021年大数据Hadoop(八):HDFS的Shell命令行使用

2021年大数据Hadoop(七):HDFS分布式文件系统简介

2021年大数据Hadoop(六):全网最详细的Hadoop集群搭建

2021年大数据Hadoop(五):Hadoop架构

2021年大数据Hadoop(四):Hadoop发行版公司

2021年大数据Hadoop(三):Hadoop国内外应用

2021年大数据Hadoop(二):Hadoop发展简史和特性优点

2021年大数据Hadoop(一):Hadoop介绍

 

前言

 2021年全网最详细的大数据笔记,轻松带你从入门到精通,该栏目每天更新,汇总知识分享

 

​​​​​​​MapReduce分区

 

​​​​​​​分区概述

在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个Reduce当中进行处理。例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等

其实就是相同类型的数据, 有共性的数据, 送到一起去处理, 在Reduce过程中,可以根据实际需求(比如按某个维度进行归档,类似于数据库的分组),把Map完的数据Reduce到不同的文件中。分区的设置需要与ReduceTaskNum配合使用。比如想要得到5个分区的数据结果。那么就得设置5个ReduceTask。

需求:将以下数据进行分开处理

详细数据参见partition.csv  这个文本文件,其中第五个字段表示开奖结果数值,现在需求将15以上的结果以及15以下的结果进行分开成两个文件进行保存

 

​​​​​​​分区步骤

1、定义 Mapper

这个 Mapper 程序不做任何逻辑, 也不对 Key-Value 做任何改变, 只是接收数据, 然后往下发送

  1. public class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
  2.     @Override
  3.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  4.         context.write(value,NullWritable.get());
  5.     }
  6. }

 

2、自定义Partitioner

主要的逻辑就在这里, 这也是这个案例的意义, 通过 Partitioner 将数据分发给不同的 Reducer

  1. /**
  2.  * 这里的输入类型与我们map阶段的输出类型相同
  3.  */
  4. public class MyPartitioner extends Partitioner<Text,NullWritable>{
  5.     /**
  6.      * 返回值表示我们的数据要去到哪个分区
  7.      * 返回值只是一个分区的标记,标记所有相同的数据去到指定的分区
  8.      */
  9.     @Override
  10.     public int getPartition(Text text, NullWritable nullWritable, int i) {
  11.         String result = text.toString().split("\t")[5];
  12.         if (Integer.parseInt(result) > 15){
  13.             return 1;
  14.         }else{
  15.             return 0;
  16.         }
  17.     }
  18. }

 

3、定义 Reducer 逻辑

这个 Reducer 也不做任何处理, 将数据原封不动的输出即可

  1. public class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable{
  2.     @Override
  3.     protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  4.         context.write(key,NullWritable.get());
  5.     }
  6. }

 

4、主类中设置分区类和ReduceTask个数

  1. public class PartitionerRunner {
  2.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  3.         //1、创建建一个job任务对象
  4.         Configuration configuration = new Configuration();
  5.         Job job = Job.getInstance(configuration, "mypartitioner");
  6.         //2、指定job所在的jar包
  7.         job.setJarByClass(PartitionerRunner.class);
  8.         //3、指定源文件的读取方式类和源文件的读取路径
  9.         job.setInputFormatClass(TextInputFormat.class); //按照行读取
  10.         TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/input/partitioner")); //只需要指定源文件所在的目录即可
  11.         // TextInputFormat.addInputPath(job, new Path("file:///E:\\input\\partitioner")); //只需要指定源文件所在的目录即可
  12.         //4、指定自定义的Mapper类和K2、V2类型
  13.         job.setMapperClass(PartitionerMapper.class); //指定Mapper类
  14.         job.setMapOutputKeyClass(Text.class); //K2类型
  15.         job.setMapOutputValueClass(NullWritable.class);//V2类型
  16.         //5、指定自定义分区类(如果有的话)
  17.         job.setPartitionerClass(MyPartitioner.class);
  18.         //6、指定自定义分组类(如果有的话)
  19.         //7、指定自定义的Reducer类和K3、V3的数据类型
  20.         job.setReducerClass(PartitionerReducer.class); //指定Reducer类
  21.         job.setOutputKeyClass(Text.class); //K3类型
  22.         job.setOutputValueClass(NullWritable.class);  //V3类型
  23.         //设置Reduce的个数
  24.         job.setNumReduceTasks(2);
  25.         //8、指定输出方式类和结果输出路径
  26.         job.setOutputFormatClass(TextOutputFormat.class);
  27.         TextOutputFormat.setOutputPath(job, new  Path("hdfs://node1:8020/output/partitioner")); //目标目录不能存在,否则报错
  28.         //TextOutputFormat.setOutputPath(job, new  Path("file:///E:\\output\\partitoner")); //目标目录不能存在,否则报错
  29.         //9、将job提交到yarn集群
  30.         boolean bl = job.waitForCompletion(true); //true表示可以看到任务的执行进度
  31.         //10.退出执行进程
  32.         System.exit(bl?0:1);
  33.     }
  34. }


所属网站分类: 技术文章 > 博客

作者:匿名哭哭

链接:http://www.phpheidong.com/blog/article/86909/d0d11fc1a06d55719f7e/

来源:php黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

27 0
收藏该文
已收藏

评论内容:(最多支持255个字符)