📜  MapReduce-合并器

📅  最后修改于: 2020-11-30 05:32:53             🧑  作者: Mango


合并器,也称为半缩减器,是一个可选类,其操作是通过接受Map类的输入,然后将输出键值对传递给Reducer类。

组合器的主要函数是使用相同的键来汇总地图输出记录。组合器的输出(键值集合)将通过网络发送到实际的Reducer任务作为输入。

合路器

在Map类和Reduce类之间使用Combiner类可减少Map和Reduce之间的数据传输量。通常,映射任务的输出很大,并且传输到reduce任务的数据很大。

以下MapReduce任务图显示了COMBINER PHASE。

合路器

组合器如何工作?

这是有关MapReduce Combiner的工作原理的简短摘要-

  • 组合器没有预定义的接口,并且必须实现Reducer接口的reduce()方法。

  • 组合器对每个映射输出键进行操作。它必须具有与Reducer类相同的输出键值类型。

  • 组合器可以代替大型Map输出,因此可以从大型数据集中生成摘要信息。

尽管Combiner是可选的,但它有助于在Reduce阶段将数据分为多个组,这使处理变得更容易。

MapReduce合并器实现

以下示例提供了有关组合器的理论思想。让我们假设我们为MapReduce有以下名为input.txt的输入文本文件。

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

下面讨论了带有合并器的MapReduce程序的重要阶段。

记录阅读器

这是MapReduce的第一阶段,其中记录读取器从输入文本文件中以文本形式读取每一行,并以键值对的形式输出输出。

输入-输入文件中的逐行文本。

输出-形成键值对。以下是一组预期的键值对。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

地图阶段

映射阶段从记录读取器获取输入,进行处理,然后将输出作为另一组键值对产生。

输入-以下键值对是从记录读取器获取的输入。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Map阶段读取每个键值对,使用StringTokenizer将每个单词与值相除,将每个单词视为键,并将该单词的计数视为值。以下代码段显示了Mapper类和map函数。

public static class TokenizerMapper extends Mapper
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

输出-预期输出如下-

     
     
    
    

合并阶段

Combiner阶段从Map阶段获取每个键值对,对其进行处理,并将输出作为键值集合对产生。

输入-以下键值对是从Map阶段获取的输入。

     
     
    
    

Combiner阶段读取每个键值对,将常见单词组合为键,将值组合为集合。通常,组合器的代码和操作与简化器的代码和操作相似。以下是Mapper,Combiner和Reducer类声明的代码段。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

输出-预期输出如下-

     
  
  
   

减速器相

Reducer阶段从Combiner阶段获取每个键值集合对,对其进行处理,然后将输出作为键值对传递。请注意,Combiner功能与Reducer相同。

输入-以下键值对是从Combiner阶段获取的输入。

     
  
  
   

Reducer阶段读取每个键值对。以下是Combiner的代码段。

public static class IntSumReducer extends Reducer 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

输出-减速器阶段的预期输出如下-

     
  
  
   

唱片作家

这是MapReduce的最后一个阶段,其中Record Writer写入Reducer阶段中的每个键值对,并将输出作为文本发送。

输入-Reducer阶段中的每个键值对以及输出格式。

输出-它以文本格式为您提供键-值对。以下是预期的输出。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

范例程序

以下代码块计算程序中的单词数。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
        
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
        
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
        
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

将上述程序另存为WordCount.java 。该程序的编译和执行如下。

编译与执行

让我们假设我们位于Hadoop用户的主目录中(例如/ home / hadoop)。

请按照下面给出的步骤来编译和执行上述程序。

步骤1-使用以下命令创建一个目录来存储已编译的java类。

$ mkdir units

步骤2-下载Hadoop-core-1.2.1.jar,它用于编译和执行MapReduce程序。您可以从mvnrepository.com下载jar。

让我们假设下载的文件夹是/ home / hadoop /。

步骤3-使用以下命令来编译WordCount.java程序并为该程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

步骤4-使用以下命令在HDFS中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

步骤5-使用以下命令将名为input.txt的输入文件复制到HDFS的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

步骤6-使用以下命令来验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤7-使用以下命令通过从输入目录获取输入文件来运行字数统计应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待一段时间,直到文件被执行。执行后,输出包含许多输入拆分,映射任务和Reducer任务。

步骤8-使用以下命令来验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

步骤9-使用以下命令查看Part-00000文件中的输出。该文件由HDFS生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是MapReduce程序生成的输出。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1