先决条件:Hadoop 和 MapReduce
计算任何语言中的单词数量就像在 C、C++、 Python、 Java等中一样小菜一碟。 MapReduce 也使用Java,但如果您知道如何编写它的语法,它会很容易。它是 MapReduce 的基础。您将首先学习如何用其他语言执行类似于“Hello World”程序的这段代码。所以这里是展示如何为 Word Count 编写 MapReduce 代码的步骤。
例子:
输入:
Hello I am GeeksforGeeks
Hello I am an Intern
输出:
GeeksforGeeks 1
Hello 2
I 2
Intern 1
am 2
an 1
脚步:
- 首先打开Eclipse -> 然后选择File -> New -> Java Project -> Name it WordCount -> 然后Finish 。
- 在项目中创建三个Java类。将它们命名为WCDriver (具有主要函数)、 WCMapper 、 WCReducer 。
- 您必须为此包含两个参考库:
右键单击Project -> 然后选择Build Path -> 单击Configure Build Path
- 在上图中,您可以在右侧看到 Add External JARs 选项。单击它并添加以下提及文件。您可以在/usr/lib/ 中找到这些文件
1. /usr/lib/hadoop-0.20-mapreduce/hadoop-core-2.6.0-mr1-cdh5.13.0.jar
2. /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.13.0.jar
Mapper 代码:您必须将此程序复制粘贴到 WCMapper Java Class 文件中。
Java
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class WCMapper extends MapReduceBase implements Mapper {
// Map function
public void map(LongWritable key, Text value, OutputCollector output, Reporter rep) throws IOException
{
String line = value.toString();
// Splitting the line on spaces
for (String word : line.split(" "))
{
if (word.length() > 0)
{
output.collect(new Text(word), new IntWritable(1));
}
}
}
}
Java
// Importing libraries
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class WCReducer extends MapReduceBase implements Reducer {
// Reduce function
public void reduce(Text key, Iterator value,
OutputCollector output,
Reporter rep) throws IOException
{
int count = 0;
// Counting the frequency of each words
while (value.hasNext())
{
IntWritable i = value.next();
count += i.get();
}
output.collect(key, new IntWritable(count));
}
}
Java
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WCDriver extends Configured implements Tool {
public int run(String args[]) throws IOException
{
if (args.length < 2)
{
System.out.println("Please give valid inputs");
return -1;
}
JobConf conf = new JobConf(WCDriver.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(WCMapper.class);
conf.setReducerClass(WCReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
return 0;
}
// Main Method
public static void main(String args[]) throws Exception
{
int exitCode = ToolRunner.run(new WCDriver(), args);
System.out.println(exitCode);
}
}
Reducer 代码:您必须将此程序复制粘贴到 WCReducer Java Class 文件中。
Java
// Importing libraries
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class WCReducer extends MapReduceBase implements Reducer {
// Reduce function
public void reduce(Text key, Iterator value,
OutputCollector output,
Reporter rep) throws IOException
{
int count = 0;
// Counting the frequency of each words
while (value.hasNext())
{
IntWritable i = value.next();
count += i.get();
}
output.collect(key, new IntWritable(count));
}
}
驱动程序代码:您必须将此程序复制粘贴到 WCDriver Java类文件中。
Java
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WCDriver extends Configured implements Tool {
public int run(String args[]) throws IOException
{
if (args.length < 2)
{
System.out.println("Please give valid inputs");
return -1;
}
JobConf conf = new JobConf(WCDriver.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(WCMapper.class);
conf.setReducerClass(WCReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
return 0;
}
// Main Method
public static void main(String args[]) throws Exception
{
int exitCode = ToolRunner.run(new WCDriver(), args);
System.out.println(exitCode);
}
}
- 现在你必须制作一个 jar 文件。右键单击项目->单击导出->选择导出目标为 Jar 文件->命名 jar 文件(WordCount.jar) ->单击下一步-> 最后单击完成。现在将此文件复制到 Cloudera 的 Workspace 目录中
- 在 CDH 上打开终端并将目录更改为工作区。您可以使用“cd workspace/”命令来完成此操作。现在,创建一个文本文件( WCFile.txt )并将其移动到 HDFS。对于那个打开的终端并编写此代码(请记住,您应该与刚刚创建的 jar 文件位于同一目录中)。
- 现在,运行此命令将文件输入文件复制到 HDFS。
hadoop fs -put WCFile.txt WCFile.txt
- 现在通过编写代码来运行 jar 文件,如屏幕截图所示。
- 执行代码后,您可以在WCOutput文件中或通过在终端上编写以下命令来查看结果。
hadoop fs -cat WCOutput/part-00000