📅  最后修改于: 2020-11-30 05:32:01             🧑  作者: Mango
分区器的工作原理类似于处理输入数据集的条件。分区阶段发生在Map阶段之后以及Reduce阶段之前。
分隔器的数量等于减速器的数量。这意味着分区器将根据缩减器的数量对数据进行拆分。因此,从单个分区程序传递的数据由单个Reducer处理。
分区程序对中间Map输出的键值对进行分区。它使用用户定义的条件对数据进行分区,该条件类似于哈希函数。分区总数与作业的Reducer任务数相同。让我们举一个例子来了解分区器是如何工作的。
为了方便起见,让我们假设我们有一个名为Employee的小表,其中包含以下数据。我们将使用此样本数据作为输入数据集来演示分区程序的工作方式。
Id | Name | Age | Gender | Salary |
---|---|---|---|---|
1201 | gopal | 45 | Male | 50,000 |
1202 | manisha | 40 | Female | 50,000 |
1203 | khalil | 34 | Male | 30,000 |
1204 | prasanth | 30 | Male | 30,000 |
1205 | kiran | 20 | Male | 40,000 |
1206 | laxmi | 25 | Female | 35,000 |
1207 | bhavya | 20 | Female | 15,000 |
1208 | reshma | 19 | Female | 15,000 |
1209 | kranthi | 22 | Male | 22,000 |
1210 | Satish | 24 | Male | 25,000 |
1211 | Krishna | 25 | Male | 25,000 |
1212 | Arshad | 28 | Male | 20,000 |
1213 | lavanya | 18 | Female | 8,000 |
我们必须编写一个应用程序来处理输入数据集,以找到不同年龄段(例如20岁以下,21至30岁之间,30岁以上)按性别划分的薪水最高的员工。
以上数据另存为input.txt在“ / home / hadoop / hadoopPartitioner”目录中,并作为输入提供。
1201 | gopal | 45 | Male | 50000 |
1202 | manisha | 40 | Female | 51000 |
1203 | khaleel | 34 | Male | 30000 |
1204 | prasanth | 30 | Male | 31000 |
1205 | kiran | 20 | Male | 40000 |
1206 | laxmi | 25 | Female | 35000 |
1207 | bhavya | 20 | Female | 15000 |
1208 | reshma | 19 | Female | 14000 |
1209 | kranthi | 22 | Male | 22000 |
1210 | Satish | 24 | Male | 25000 |
1211 | Krishna | 25 | Male | 26000 |
1212 | Arshad | 28 | Male | 20000 |
1213 | lavanya | 18 | Female | 8000 |
根据给定的输入,以下是该程序的算法说明。
当我们将文本数据保存在文本文件中时,map任务接受键值对作为输入。此地图任务的输入如下-
输入-键将是一种模式,例如“任何特殊键+文件名+行号”(例如:key = @ input1),则该值将是该行中的数据(例如:value = 1201 \ t gopal \ t 45 \ t Male \ t 50000)。
方法-此映射任务的操作如下-
从参数列表以字符串读取值(记录数据),该值作为输入值。
使用split函数,分隔性别并将其存储在字符串变量中。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
将性别信息和记录数据值作为输出键值对从映射任务发送到分区任务。
context.write(new Text(gender), new Text(value));
对文本文件中的所有记录重复上述所有步骤。
输出-您将获得性别数据和记录数据值作为键值对。
分区器任务接受来自映射任务的键-值对作为其输入。分区意味着将数据划分为多个段。根据给定的分区条件条件,可以将输入的键值对数据根据年龄条件分为三部分。
输入-键值对集合中的整个数据。
键=记录中的性别字段值。
值=该性别的整个记录数据值。
方法-分区逻辑的过程如下运行。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
在以下条件下检查年龄值。
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
输出-键值对的整个数据被分为三个键值对集合。 Reducer在每个集合上单独工作。
分区程序任务的数量等于简化程序任务的数量。在这里,我们有三个分区程序任务,因此有三个要执行的Reducer任务。
输入-Reducer将使用不同的键值对集合执行三次。
键=记录中的性别字段值。
值=该性别的整个记录数据。
方法-以下逻辑将应用于每个集合。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
用max变量检查薪水。如果str [4]是最高工资,则将str [4]分配给max,否则跳过该步骤。
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
对每个密钥集合重复步骤1和2(男性和女性是密钥集合)。执行完这三个步骤后,您将从“男性”钥匙集中找到一个最高工资,从“女性”钥匙集中找到一个最高工资。
context.write(new Text(key), new IntWritable(max));
输出-最后,您将在三个不同年龄组的集合中获得一组键值对数据。它分别包含每个年龄组中男性收入的最高薪水和女性收入的最高薪水。
执行Map,Partitioner和Reduce任务后,键-值对数据的三个集合存储在三个不同的文件中作为输出。
所有这三个任务都被视为MapReduce作业。这些作业的以下要求和规格应在“配置”中指定-
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
以下程序显示了如何在MapReduce程序中为给定条件实现分区程序。
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer
{
public int max = -1;
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
将以上代码另存为“ / home / hadoop / hadoopPartitioner”中的PartitionerExample.java 。该程序的编译和执行如下。
让我们假设我们位于Hadoop用户的主目录中(例如/ home / hadoop)。
请按照下面给出的步骤来编译和执行上述程序。
步骤1-下载用于编译和执行MapReduce程序的Hadoop-core-1.2.1.jar。您可以从mvnrepository.com下载jar。
让我们假设下载的文件夹是“ / home / hadoop / hadoopPartitioner”
步骤2-以下命令用于编译程序PartitionerExample.java并为该程序创建一个jar。
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
步骤3-使用以下命令在HDFS中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤4-使用以下命令将名为input.txt的输入文件复制到HDFS的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
步骤5-使用以下命令来验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤6-使用以下命令通过从输入目录获取输入文件来运行最高薪水应用程序。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
等待一段时间,直到文件被执行。执行后,输出包含许多输入拆分,映射任务和Reducer任务。
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
步骤7-使用以下命令来验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
您将在三个文件中找到输出,因为您在程序中使用了三个分区程序和三个Reducer。
步骤8-使用以下命令查看Part-00000文件中的输出。该文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
00000部分输出
Female 15000
Male 40000
使用以下命令查看Part-00001文件中的输出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
在第00001部分中输出
Female 35000
Male 31000
使用以下命令查看Part-00002文件中的输出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
在00002部分中输出
Female 51000
Male 50000