📜  如何使用 Cloudera Distribution Hadoop(CDH) 在 MapReduce 中执行 WordCount 程序

📅  最后修改于: 2021-10-27 06:36:39             🧑  作者: Mango

先决条件:Hadoop 和 MapReduce

计算任何语言中的单词数量就像在 C、C++、 Python、 Java等中一样小菜一碟。 MapReduce 也使用Java,但如果您知道如何编写它的语法,它会很容易。它是 MapReduce 的基础。您将首先学习如何用其他语言执行类似于“Hello World”程序的这段代码。所以这里是展示如何为 Word Count 编写 MapReduce 代码的步骤。
例子:
输入:

Hello I am GeeksforGeeks
Hello I am an Intern

输出:

GeeksforGeeks  1
Hello    2
I        2
Intern   1
am       2
an       1

脚步:

  • 首先打开Eclipse -> 然后选择File -> New -> Java Project -> Name it WordCount -> 然后Finish

  • 在项目中创建三个Java类。将它们命名为WCDriver (具有主要函数)、 WCMapperWCReducer
  • 您必须为此包含两个参考库:
    右键单击Project -> 然后选择Build Path -> 单击Configure Build Path

  • 在上图中,您可以在右侧看到 Add External JARs 选项。单击它并添加以下提及文件。您可以在/usr/lib/ 中找到这些文件
    1. /usr/lib/hadoop-0.20-mapreduce/hadoop-core-2.6.0-mr1-cdh5.13.0.jar
    2. /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.13.0.jar

Mapper 代码:您必须将此程序复制粘贴到 WCMapper Java Class 文件中。

Java
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
 
public class WCMapper extends MapReduceBase implements Mapper {
 
    // Map function
    public void map(LongWritable key, Text value, OutputCollector output, Reporter rep) throws IOException
    {
 
        String line = value.toString();
 
        // Splitting the line on spaces
        for (String word : line.split(" "))
        {
            if (word.length() > 0)
            {
                output.collect(new Text(word), new IntWritable(1));
            }
        }
    }
}


Java
// Importing libraries
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
 
public class WCReducer extends MapReduceBase implements Reducer {
 
    // Reduce function
    public void reduce(Text key, Iterator value,
                   OutputCollector output,
                            Reporter rep) throws IOException
    {
 
        int count = 0;
 
        // Counting the frequency of each words
        while (value.hasNext())
        {
            IntWritable i = value.next();
            count += i.get();
        }
 
        output.collect(key, new IntWritable(count));
    }
}


Java
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class WCDriver extends Configured implements Tool {
 
    public int run(String args[]) throws IOException
    {
        if (args.length < 2)
        {
            System.out.println("Please give valid inputs");
            return -1;
        }
 
        JobConf conf = new JobConf(WCDriver.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(WCMapper.class);
        conf.setReducerClass(WCReducer.class);
        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        JobClient.runJob(conf);
        return 0;
    }
 
    // Main Method
    public static void main(String args[]) throws Exception
    {
        int exitCode = ToolRunner.run(new WCDriver(), args);
        System.out.println(exitCode);
    }
}


Reducer 代码:您必须将此程序复制粘贴到 WCReducer Java Class 文件中。

Java

// Importing libraries
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
 
public class WCReducer extends MapReduceBase implements Reducer {
 
    // Reduce function
    public void reduce(Text key, Iterator value,
                   OutputCollector output,
                            Reporter rep) throws IOException
    {
 
        int count = 0;
 
        // Counting the frequency of each words
        while (value.hasNext())
        {
            IntWritable i = value.next();
            count += i.get();
        }
 
        output.collect(key, new IntWritable(count));
    }
}

驱动程序代码:您必须将此程序复制粘贴到 WCDriver Java类文件中。

Java

// Importing libraries
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class WCDriver extends Configured implements Tool {
 
    public int run(String args[]) throws IOException
    {
        if (args.length < 2)
        {
            System.out.println("Please give valid inputs");
            return -1;
        }
 
        JobConf conf = new JobConf(WCDriver.class);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(WCMapper.class);
        conf.setReducerClass(WCReducer.class);
        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        JobClient.runJob(conf);
        return 0;
    }
 
    // Main Method
    public static void main(String args[]) throws Exception
    {
        int exitCode = ToolRunner.run(new WCDriver(), args);
        System.out.println(exitCode);
    }
}
  • 现在你必须制作一个 jar 文件。右键单击项目->单击导出->选择导出目标为 Jar 文件->命名 jar 文件(WordCount.jar) ->单击下一步-> 最后单击完成。现在将此文件复制到 Cloudera 的 Workspace 目录中

  • 在 CDH 上打开终端并将目录更改为工作区。您可以使用“cd workspace/”命令来完成此操作。现在,创建一个文本文件( WCFile.txt )并将其移动到 HDFS。对于那个打开的终端并编写此代码(请记住,您应该与刚刚创建的 jar 文件位于同一目录中)。

  • 现在,运行此命令将文件输入文件复制到 HDFS。
hadoop fs -put WCFile.txt WCFile.txt

  • 现在通过编写代码来运行 jar 文件,如屏幕截图所示。

  • 执行代码后,您可以在WCOutput文件中或通过在终端上编写以下命令来查看结果。
hadoop fs -cat WCOutput/part-00000