📅  最后修改于: 2020-11-30 05:30:40             🧑  作者: Mango
MapReduce是一个框架,用于编写应用程序以可靠的方式处理大型商用硬件集群上的大量数据。本章将指导您使用Java在Hadoop框架中操作MapReduce。
通常,MapReduce范例基于将map-reduce程序发送到实际数据所在的计算机。
在MapReduce作业期间,Hadoop将Map和Reduce任务发送到集群中的相应服务器。
该框架管理数据传递的所有细节,例如发布任务,验证任务完成以及在节点之间的集群周围复制数据。
大多数计算是在本地磁盘上的数据在节点上进行的,从而减少了网络流量。
完成给定任务后,集群将收集并减少数据以形成适当的结果,然后将其发送回Hadoop服务器。
MapReduce框架对键值对进行操作,也就是说,该框架将作业的输入视为一组键值对,并生成一组键值对作为作业的输出(可能是不同类型)。
键和值类必须由框架可序列化,因此,需要实现Writable接口。此外,关键类必须实现WritableComparable接口,以利于框架进行排序。
MapReduce作业的输入和输出格式均为键值对形式-
(输入)
Input | Output | |
---|---|---|
Map | list ( |
|
Reduce | list ( |
下表显示了有关组织用电量的数据。该表包括连续五年的每月用电量和年平均电量。
Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
我们需要编写应用程序来处理给定表中的输入数据,以查找最大使用年份,最小使用年份,等等。对于具有有限记录量的程序员而言,此任务很容易,因为他们只需编写逻辑以生成所需的输出,然后将数据传递给编写的应用程序即可。
现在让我们提高输入数据的规模。假设我们必须分析特定州所有大型工业的用电量。当我们编写应用程序来处理此类批量数据时,
他们将花费很多时间来执行。
当我们将数据从源移动到网络服务器时,网络流量将会很大。
为了解决这些问题,我们有MapReduce框架。
以上数据另存为sample.txt,并作为输入提供。输入文件如下所示。
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
以下示例数据程序使用MapReduce框架。
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
将以上程序保存到ProcessUnits.java中。该程序的编译和执行如下。
让我们假设我们位于Hadoop用户的主目录中(例如/ home / hadoop)。
请按照下面给出的步骤来编译和执行上述程序。
步骤1-使用以下命令创建一个目录来存储已编译的java类。
$ mkdir units
步骤2-下载Hadoop-core-1.2.1.jar,它用于编译和执行MapReduce程序。从mvnrepository.com下载jar。让我们假设下载文件夹为/ home / hadoop /。
步骤3-以下命令用于编译ProcessUnits.java程序并为该程序创建一个jar。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
步骤4-以下命令用于在HDFS中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤5-以下命令用于将名为sample.txt的输入文件复制到HDFS的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
步骤6-以下命令用于验证输入目录中的文件
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤7-以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到文件被执行。执行后,输出包含许多输入分割,映射任务,Reducer任务等。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
步骤8-以下命令用于验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
步骤9-以下命令用于查看Part-00000文件中的输出。该文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是MapReduce程序生成的输出-
1981 | 34 |
1984 | 40 |
1985 | 45 |
步骤10-以下命令用于将输出文件夹从HDFS复制到本地文件系统。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop