程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

java hadoop(四) hadoop mapreduce 分区案例

发布于2021-06-12 14:17     阅读(686)     评论(0)     点赞(17)     收藏(2)


我们在编码步骤那章简单介绍了一下分区。

其实通俗来说,分区就是根绝K的某个特征属性(可能是大小、长度以及尾号等等),将K的数据写到不同的文件里。

比如我们根据ID来分区,我们可以根据尾号来分10个文件。

有几个分区就有几个Reduce Task。

 

 

实现分区案例:

我们根据hdfs文件中的这个字段大小进行分区,大于15的一个区,小于15的一个区,最终有两个out文件。

目录结构:

Mapper:

  1. package partitiondemo;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. * 传入四个参数,第一个参数是这一行的起始点在文件中的偏移量 第二个就是读取到的内容
  9. * 第三个 第四个 是我们输出的K2 V2类型
  10. * 思路:将文本内容作为K2。 V2直接为空
  11. */
  12. public class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
  13. @Override
  14. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  15. //value 直接写入空值
  16. context.write(value, NullWritable.get());
  17. }
  18. }

Partitioner:

  1. package partitiondemo;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. /**
  6. * 这里的两个参数就是map的K2 V2
  7. */
  8. public class MyPartitioner extends Partitioner<Text, NullWritable> {
  9. @Override
  10. public int getPartition(Text text, NullWritable nullWritable, int i) {
  11. //取出根据数值分区的那个值
  12. String num = text.toString().split("\t")[5];
  13. //分区
  14. if(Integer.parseInt(num)>15){
  15. return 1;
  16. }else {
  17. return 0;
  18. }
  19. }
  20. }

Reducer:

  1. package partitiondemo;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * 没有处理
  8. */
  9. public class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
  10. @Override
  11. protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  12. context.write(key,NullWritable.get());
  13. }
  14. }

Main:

  1. package partitiondemo;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  9. public class ReducerMain {
  10. public static void main(String[] args) throws Exception{
  11. //创建job
  12. Configuration configuration = new Configuration();
  13. Job job = Job.getInstance(configuration, "my-partition");
  14. //指定job所在jar包
  15. job.setJarByClass(ReducerMain.class);
  16. //指定文件读取方式 路径
  17. job.setInputFormatClass(TextInputFormat.class);
  18. TextInputFormat.addInputPath(job, new Path("hdfs://192.168.40.150:9000/test.txt"));
  19. //指定mapper
  20. job.setMapperClass(MyMapper.class);
  21. job.setMapOutputKeyClass(Text.class);
  22. job.setMapOutputValueClass(NullWritable.class);
  23. //指定自定义分区类
  24. job.setPartitionerClass(MyPartitioner.class);
  25. //指定reducer
  26. job.setReducerClass(MyReducer.class);
  27. job.setOutputKeyClass(Text.class);
  28. job.setOutputValueClass(NullWritable.class);
  29. //设置分区个数 我们是两个reducer
  30. job.setNumReduceTasks(2);
  31. //指定输出方式类以及输出路径 目录必须不存在
  32. job.setOutputFormatClass(TextOutputFormat.class);
  33. TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.40.150:9000/lgy_test/res"));
  34. //将job提交到yarn集群
  35. boolean bl = job.waitForCompletion(true);
  36. System.exit(bl?0:1);
  37. }
  38. }

 

原文链接:https://blog.csdn.net/qq_40771567/article/details/117745451



所属网站分类: 技术文章 > 博客

作者:你不要惹我

链接:http://www.javaheidong.com/blog/article/222244/2e7293808950a778a45d/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

17 0
收藏该文
已收藏

评论内容:(最多支持255个字符)