📜  MapReduce-分区程序

📅  最后修改于: 2020-11-30 05:32:01             🧑  作者: Mango


分区器的工作原理类似于处理输入数据集的条件。分区阶段发生在Map阶段之后以及Reduce阶段之前。

分隔器的数量等于减速器的数量。这意味着分区器将根据缩减器的数量对数据进行拆分。因此,从单个分区程序传递的数据由单个Reducer处理。

分区器

分区程序对中间Map输出的键值对进行分区。它使用用户定义的条件对数据进行分区,该条件类似于哈希函数。分区总数与作业的Reducer任务数相同。让我们举一个例子来了解分区器是如何工作的。

MapReduce分区程序实现

为了方便起见,让我们假设我们有一个名为Employee的小表,其中包含以下数据。我们将使用此样本数据作为输入数据集来演示分区程序的工作方式。

Id Name Age Gender Salary
1201 gopal 45 Male 50,000
1202 manisha 40 Female 50,000
1203 khalil 34 Male 30,000
1204 prasanth 30 Male 30,000
1205 kiran 20 Male 40,000
1206 laxmi 25 Female 35,000
1207 bhavya 20 Female 15,000
1208 reshma 19 Female 15,000
1209 kranthi 22 Male 22,000
1210 Satish 24 Male 25,000
1211 Krishna 25 Male 25,000
1212 Arshad 28 Male 20,000
1213 lavanya 18 Female 8,000

我们必须编写一个应用程序来处理输入数据集,以找到不同年龄段(例如20岁以下,21至30岁之间,30岁以上)按性别划分的薪水最高的员工。

输入数据

以上数据另存为input.txt在“ / home / hadoop / hadoopPartitioner”目录中,并作为输入提供。

1201 gopal 45 Male 50000
1202 manisha 40 Female 51000
1203 khaleel 34 Male 30000
1204 prasanth 30 Male 31000
1205 kiran 20 Male 40000
1206 laxmi 25 Female 35000
1207 bhavya 20 Female 15000
1208 reshma 19 Female 14000
1209 kranthi 22 Male 22000
1210 Satish 24 Male 25000
1211 Krishna 25 Male 26000
1212 Arshad 28 Male 20000
1213 lavanya 18 Female 8000

根据给定的输入,以下是该程序的算法说明。

地图任务

当我们将文本数据保存在文本文件中时,map任务接受键值对作为输入。此地图任务的输入如下-

输入-键将是一种模式,例如“任何特殊键+文件名+行号”(例如:key = @ input1),则该值将是该行中的数据(例如:value = 1201 \ t gopal \ t 45 \ t Male \ t 50000)。

方法-此映射任务的操作如下-

  • 从参数列表以字符串读取(记录数据),该值作为输入值。

  • 使用split函数,分隔性别并将其存储在字符串变量中。

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 将性别信息和记录数据作为输出键值对从映射任务发送到分区任务

context.write(new Text(gender), new Text(value));
  • 对文本文件中的所有记录重复上述所有步骤。

输出-您将获得性别数据和记录数据值作为键值对。

分区任务

分区器任务接受来自映射任务的键-值对作为其输入。分区意味着将数据划分为多个段。根据给定的分区条件条件,可以将输入的键值对数据根据年龄条件分为三部分。

输入-键值对集合中的整个数据。

键=记录中的性别字段值。

值=该性别的整个记录数据值。

方法-分区逻辑的过程如下运行。

  • 从输入键值对中读取年龄字段值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 在以下条件下检查年龄值。

    • 年龄小于或等于20
    • 年龄大于20岁且小于或等于30岁。
    • 年龄大于30岁。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

输出-键值对的整个数据被分为三个键值对集合。 Reducer在每个集合上单独工作。

减少任务

分区程序任务的数量等于简化程序任务的数量。在这里,我们有三个分区程序任务,因此有三个要执行的Reducer任务。

输入-Reducer将使用不同的键值对集合执行三次。

键=记录中的性别字段值。

值=该性别的整个记录数据。

方法-以下逻辑将应用于每个集合。

  • 读取每个记录的薪金字段值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • 用max变量检查薪水。如果str [4]是最高工资,则将str [4]分配给max,否则跳过该步骤。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • 对每个密钥集合重复步骤1和2(男性和女性是密钥集合)。执行完这三个步骤后,您将从“男性”钥匙集中找到一个最高工资,从“女性”钥匙集中找到一个最高工资。

context.write(new Text(key), new IntWritable(max));

输出-最后,您将在三个不同年龄组的集合中获得一组键值对数据。它分别包含每个年龄组中男性收入的最高薪水和女性收入的最高薪水。

执行Map,Partitioner和Reduce任务后,键-值对数据的三个集合存储在三个不同的文件中作为输出。

所有这三个任务都被视为MapReduce作业。这些作业的以下要求和规格应在“配置”中指定-

  • 工作名称
  • 键和值的输入和输出格式
  • Map,Reduce和Partitioner任务的各个类
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

范例程序

以下程序显示了如何在MapReduce程序中为给定条件实现分区程序。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
    
   public static class MapClass extends Mapper
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
    
   public static class ReduceClass extends Reducer
   {
      public int max = -1;
      public void reduce(Text key, Iterable  values, Context context) throws IOException, InterruptedException
      {
         max = -1;
            
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
            
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
    
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
        
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
        
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
        
      job.setMapperClass(MapClass.class);
        
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
        
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
        
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
        
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

将以上代码另存为“ / home / hadoop / hadoopPartitioner”中的PartitionerExample.java 。该程序的编译和执行如下。

编译与执行

让我们假设我们位于Hadoop用户的主目录中(例如/ home / hadoop)。

请按照下面给出的步骤来编译和执行上述程序。

步骤1-下载用于编译和执行MapReduce程序的Hadoop-core-1.2.1.jar。您可以从mvnrepository.com下载jar。

让我们假设下载的文件夹是“ / home / hadoop / hadoopPartitioner”

步骤2-以下命令用于编译程序PartitionerExample.java并为该程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

步骤3-使用以下命令在HDFS中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

步骤4-使用以下命令将名为input.txt的输入文件复制到HDFS的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

步骤5-使用以下命令来验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤6-使用以下命令通过从输入目录获取输入文件来运行最高薪水应用程序。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

等待一段时间,直到文件被执行。执行后,输出包含许多输入拆分,映射任务和Reducer任务。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
    
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
    
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
    
   Data-local map tasks=1
    
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
    
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
    
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
    
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
    
   Input split bytes=119
    
   Combine input records=0
   Combine output records=0
    
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
    
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
    
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
    
   Total committed heap usage (bytes)=334102528
    
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
    
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
    
File Input Format Counters

   Bytes Read=361
    
File Output Format Counters

   Bytes Written=72

步骤7-使用以下命令来验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

您将在三个文件中找到输出,因为您在程序中使用了三个分区程序和三个Reducer。

步骤8-使用以下命令查看Part-00000文件中的输出。该文件由HDFS生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

00000部分输出

Female   15000
Male     40000

使用以下命令查看Part-00001文件中的输出。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

在第00001部分中输出

Female   35000
Male    31000

使用以下命令查看Part-00002文件中的输出。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

在00002部分中输出

Female  51000
Male   50000