📅  最后修改于: 2023-12-03 14:41:41.025000             🧑  作者: Mango
Hadoop是一个由Apache开发的用于处理大数据集的分布式计算框架。其中的MapReduce是它的一个关键特性,它通过将任务分成多个子任务来并行计算分布式数据集。本文将介绍Hadoop中的MapReduce,涵盖以下内容:
MapReduce的工作原理可以简单的分为两个步骤:映射(Map)和归约(Reduce)。
在映射步骤中,MapReduce将数据集分成若干个小数据块,并在多台计算机上同时执行(即分布式计算)映射函数。映射函数处理的数据集通常略大于小数据块,这允许映射函数同时处理多个数据项。MapReduce收集和排序映射函数生成的中间结果,以便在下一步归约阶段的输入数据有序。此外,设备本身的性能,网络带宽和计算速度差异,会导致数据拆分比例不均匀。针对这种情况,Hadoop MapReduce采用较为平衡的负载均衡策略通过转移处理更快设备执行,以充分利用资源,提高处理效率。
归约步骤的任务是输出结果,在所有这些中间结果上执行单个归约函数。每个中间结果都是一个键值对,其中键表示分组依据。
在MapReduce任务完成之前,还必须进行排序、分组和分区等操作。这些操作用于确保数据以正确的顺序传递到归约步骤。
MapReduce由Hadoop自身提供,包含以下三个部分:
Map函数
Map函数将输入数据转化为中间数据。所谓的中间数据集就是一个键值对(key-value pair)的集合,其中key是中间数据的关键字,value是对应key的中间结果。
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Reduce函数
Reduce函数将中间数据转换为最终输出。其中,同一key的中间结果集在一个reduce function内执行。
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
任务驱动器(Job Driver)
任务驱动器是用于设置和运行MapReduce任务的组件。在任务驱动器中,我们需要指定MapReduce任务需要使用哪些输入和输出文件,以及对应的Map和Reduce函数。还可以在任务驱动器中设置其他相关参数。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在Hadoop中使用MapReduce通常需要以下步骤:
例如,在使用MapReduce计算一个文本文件中单词出现的次数时,可能会使用以下代码:
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在这个例子中,WordCount程序将从命令行参数读取输入和输出路径,并设置Map和Reduce函数。程序的输入使用TextInputFormat读取,输出使用TextOutputFormat写入。
MapReduce作为大数据处理领域的一个标准框架,在众多场景下表现优异,但也存在一些缺点。
分布式计算
MapReduce可以使用分布式计算并行地处理大数据集,提高处理速度。
容错性
Hadoop MapReduce具有很强的容错性。它可以自动地处理计算机故障,确保计算进程中断后可以自动恢复。
扩展性
MapReduce可以扩展到大量计算机上,以处理大量数据。它还可以通过添加更多任务(map或reduce)来进一步扩展。
可靠性
MapReduce程序必须保证算法的正确性和精确度。 MapReduce程序至少需要经过两次全数据集访问才能组合处理。处理时间较长时,容易失去精度。
数据重复
在MapReduce过程中,即使相同的key只处理一次,其对应的value也需要多次处理。这会导致一定数量的数据冗余。
时间延迟
MapReduce处理一组大数据集时,每个元素单独处理的时间都相对很短,但处理整个数据集所需的时间会很长。这可能会导致时间延迟问题。