📜  Hadoop 中的 Map Reduce(1)

📅  最后修改于: 2023-12-03 14:41:41.025000             🧑  作者: Mango

Hadoop中的Map Reduce

Hadoop是一个由Apache开发的用于处理大数据集的分布式计算框架。其中的MapReduce是它的一个关键特性,它通过将任务分成多个子任务来并行计算分布式数据集。本文将介绍Hadoop中的MapReduce,涵盖以下内容:

  1. MapReduce的工作原理
  2. MapReduce的组成部分
  3. MapReduce的使用方法
  4. MapReduce的优缺点
MapReduce的工作原理

MapReduce的工作原理可以简单的分为两个步骤:映射(Map)和归约(Reduce)。

在映射步骤中,MapReduce将数据集分成若干个小数据块,并在多台计算机上同时执行(即分布式计算)映射函数。映射函数处理的数据集通常略大于小数据块,这允许映射函数同时处理多个数据项。MapReduce收集和排序映射函数生成的中间结果,以便在下一步归约阶段的输入数据有序。此外,设备本身的性能,网络带宽和计算速度差异,会导致数据拆分比例不均匀。针对这种情况,Hadoop MapReduce采用较为平衡的负载均衡策略通过转移处理更快设备执行,以充分利用资源,提高处理效率。

归约步骤的任务是输出结果,在所有这些中间结果上执行单个归约函数。每个中间结果都是一个键值对,其中键表示分组依据。

在MapReduce任务完成之前,还必须进行排序、分组和分区等操作。这些操作用于确保数据以正确的顺序传递到归约步骤。

MapReduce的组成部分

MapReduce由Hadoop自身提供,包含以下三个部分:

  1. Map函数

    Map函数将输入数据转化为中间数据。所谓的中间数据集就是一个键值对(key-value pair)的集合,其中key是中间数据的关键字,value是对应key的中间结果。

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  private final IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {

    String line = value.toString();
    StringTokenizer itr = new StringTokenizer(line);
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}
  1. Reduce函数

    Reduce函数将中间数据转换为最终输出。其中,同一key的中间结果集在一个reduce function内执行。

import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context)
       throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}
  1. 任务驱动器(Job Driver)

    任务驱动器是用于设置和运行MapReduce任务的组件。在任务驱动器中,我们需要指定MapReduce任务需要使用哪些输入和输出文件,以及对应的Map和Reduce函数。还可以在任务驱动器中设置其他相关参数。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

public class WordCount {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(MyMapper.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
MapReduce的使用方法

在Hadoop中使用MapReduce通常需要以下步骤:

  1. 导入必要的类库
  2. 书写Map和Reduce函数
  3. 设置任务驱动器
  4. 运行任务

例如,在使用MapReduce计算一个文本文件中单词出现的次数时,可能会使用以下代码:

public class WordCount {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(MyMapper.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

在这个例子中,WordCount程序将从命令行参数读取输入和输出路径,并设置Map和Reduce函数。程序的输入使用TextInputFormat读取,输出使用TextOutputFormat写入。

MapReduce的优缺点

MapReduce作为大数据处理领域的一个标准框架,在众多场景下表现优异,但也存在一些缺点。

优点
  1. 分布式计算

    MapReduce可以使用分布式计算并行地处理大数据集,提高处理速度。

  2. 容错性

    Hadoop MapReduce具有很强的容错性。它可以自动地处理计算机故障,确保计算进程中断后可以自动恢复。

  3. 扩展性

    MapReduce可以扩展到大量计算机上,以处理大量数据。它还可以通过添加更多任务(map或reduce)来进一步扩展。

缺点
  1. 可靠性

    MapReduce程序必须保证算法的正确性和精确度。 MapReduce程序至少需要经过两次全数据集访问才能组合处理。处理时间较长时,容易失去精度。

  2. 数据重复

    在MapReduce过程中,即使相同的key只处理一次,其对应的value也需要多次处理。这会导致一定数量的数据冗余。

  3. 时间延迟

    MapReduce处理一组大数据集时,每个元素单独处理的时间都相对很短,但处理整个数据集所需的时间会很长。这可能会导致时间延迟问题。