📜  MapReduce-Hadoop实施

📅  最后修改于: 2020-11-30 05:30:40             🧑  作者: Mango


MapReduce是一个框架,用于编写应用程序以可靠的方式处理大型商用硬件集群上的大量数据。本章将指导您使用Java在Hadoop框架中操作MapReduce。

MapReduce算法

通常,MapReduce范例基于将map-reduce程序发送到实际数据所在的计算机。

  • 在MapReduce作业期间,Hadoop将Map和Reduce任务发送到集群中的相应服务器。

  • 该框架管理数据传递的所有细节,例如发布任务,验证任务完成以及在节点之间的集群周围复制数据。

  • 大多数计算是在本地磁盘上的数据在节点上进行的,从而减少了网络流量。

  • 完成给定任务后,集群将收集并减少数据以形成适当的结果,然后将其发送回Hadoop服务器。

MapReduce算法

输入和输出(Java透视图)

MapReduce框架对键值对进行操作,也就是说,该框架将作业的输入视为一组键值对,并生成一组键值对作为作业的输出(可能是不同类型)。

键和值类必须由框架可序列化,因此,需要实现Writable接口。此外,关键类必须实现WritableComparable接口,以利于框架进行排序。

MapReduce作业的输入和输出格式均为键值对形式-

(输入)->映射-> ->缩小-> (输出)。

Input Output
Map list ()
Reduce list ()

MapReduce实施

下表显示了有关组织用电量的数据。该表包括连续五年的每月用电量和年平均电量。

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

我们需要编写应用程序来处理给定表中的输入数据,以查找最大使用年份,最小使用年份,等等。对于具有有限记录量的程序员而言,此任务很容易,因为他们只需编写逻辑以生成所需的输出,然后将数据传递给编写的应用程序即可。

现在让我们提高输入数据的规模。假设我们必须分析特定州所有大型工业的用电量。当我们编写应用程序来处理此类批量数据时,

  • 他们将花费很多时间来执行。

  • 当我们将数据从源移动到网络服务器时,网络流量将会很大。

为了解决这些问题,我们有MapReduce框架。

输入数据

以上数据另存为sample.txt,并作为输入提供。输入文件如下所示。

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

范例程序

以下示例数据程序使用MapReduce框架。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
    
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator  values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
    
   //Main function
    
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
        
      conf.setJobName("max_eletricityunits");
        
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
        
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
        
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
        
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        
      JobClient.runJob(conf);
   }
}

将以上程序保存到ProcessUnits.java中。该程序的编译和执行如下。

ProcessUnits程序的编译和执行

让我们假设我们位于Hadoop用户的主目录中(例如/ home / hadoop)。

请按照下面给出的步骤来编译和执行上述程序。

步骤1-使用以下命令创建一个目录来存储已编译的java类。

$ mkdir units

步骤2-下载Hadoop-core-1.2.1.jar,它用于编译和执行MapReduce程序。从mvnrepository.com下载jar。让我们假设下载文件夹为/ home / hadoop /。

步骤3-以下命令用于编译ProcessUnits.java程序并为该程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

步骤4-以下命令用于在HDFS中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

步骤5-以下命令用于将名为sample.txt的输入文件复制到HDFS的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

步骤6-以下命令用于验证输入目录中的文件

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤7-以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待一段时间,直到文件被执行。执行后,输出包含许多输入分割,映射任务,Reducer任务等。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
    
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
    
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
    
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
    
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
    
   Input split bytes=208
   Combine input records=5
   Combine output records=5
    
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
    
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
    
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
    
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
    
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

步骤8-以下命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

步骤9-以下命令用于查看Part-00000文件中的输出。该文件由HDFS生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是MapReduce程序生成的输出-

1981 34
1984 40
1985 45

步骤10-以下命令用于将输出文件夹从HDFS复制到本地文件系统。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop