📜  Hadoop MapReduce 中的分布式缓存

📅  最后修改于: 2021-10-27 06:36:15             🧑  作者: Mango

Hadoop 的 MapReduce 框架提供了缓存小到中等只读文件(如文本文件、zip 文件、jar 文件等)的工具,并将它们广播到运行 MapReduce 作业的所有数据节点(工作节点)。每个 Datanode 都会获得一份通过分布式缓存发送的文件副本(本地副本) 。作业完成后,这些文件将从 DataNode 中删除。

分布式缓存

为什么要缓存文件?

MapReduce 作业需要一些文件,而不是每次从 HDFS 读取(增加寻道时间从而延迟) ,假设 100 次(如果 100 个 Mapper 正在运行),我们只需将文件副本发送到所有 Datanode一次.

让我们看一个例子,我们计算lyric.txt 中的单词,除了 stopWords.txt 中出现的单词。您可以在此处找到这些文件。

先决条件:

1. 将两个文件从本地文件系统复制到 HDFS。

bin/hdfs dfs -put ../Desktop/lyrics.txt  /geeksInput

// this file will be cached
bin/hdfs dfs -put ../Desktop/stopWords.txt /cached_Geeks

2.获取NameNode服务器地址。由于文件必须通过 URI(统一资源标识符)访问,我们需要这个地址。它可以在core-site.xml 中找到

Hadoop_Home_dir/etc/hadoop/core-site.xml

在我的 PC 中,它是hdfs://localhost:9000,它在您的 PC 中可能会有所不同。

映射器代码:

package word_count_DC;
  
import java.io.*;
import java.util.*;
import java.net.URI;
  
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
  
public class Cached_Word_Count extends Mapper {
  
    ArrayList stopWords = null;
  
    public void setup(Context context) throws IOException, 
                                     InterruptedException
    {
        stopWords = new ArrayList();
  
        URI[] cacheFiles = context.getCacheFiles();
  
        if (cacheFiles != null && cacheFiles.length > 0) 
        {
            try {
  
                String line = "";
  
               // Create a FileSystem object and pass the 
               // configuration object in it. The FileSystem
               // is an abstract base class for a fairly generic
               // filesystem. All user code that may potentially 
               // use the Hadoop Distributed File System should
               // be written to use a FileSystem object.
                FileSystem fs = FileSystem.get(context.getConfiguration());
                Path getFilePath = new Path(cacheFiles[0].toString());
  
                // We open the file using FileSystem object, 
                // convert the input byte stream to character
                // streams using InputStreamReader and wrap it 
                // in BufferedReader to make it more efficient
                BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(getFilePath)));
  
                while ((line = reader.readLine()) != null) 
                {
                    String[] words = line.split(" ");
  
                    for (int i = 0; i < words.length; i++) 
                    {
                        // add the words to ArrayList
                        stopWords.add(words[i]); 
                    }
                }
            }
  
            catch (Exception e)
            {
                System.out.println("Unable to read the File");
                System.exit(1);
            }
        }
    }
  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String words[] = value.toString().split(" ");
  
        for (int i = 0; i < words.length; i++) 
        {
  
            // removing all special symbols 
            // and converting it to lowerCase
            String temp = words[i].replaceAll("[?, '()]", "").toLowerCase();
  
            // if not present in ArrayList we write
            if (!stopWords.contains(temp)) 
            {
                context.write(new Text(temp), new LongWritable(1));
            }
        }
    }
}

减速机代码:

package word_count_DC;
  
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
  
public class Cached_Reducer extends Reducer {
  
    public void reduce(Text key, Iterable values,
        Context context) throws IOException, InterruptedException
    {
        long sum = 0;
  
        for (LongWritable val : values)
        {
            sum += val.get();
        }
  
        context.write(key, new LongWritable(sum));
    }
}

驱动程序代码:

package word_count_DC;
  
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
  
public class Driver {
  
    public static void main(String[] args) throws IOException, 
                 InterruptedException, ClassNotFoundException
    {
  
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  
        if (otherArgs.length != 2) 
        {
            System.err.println("Error: Give only two paths for  ");
            System.exit(1);
        }
  
        Job job = Job.getInstance(conf, "Distributed Cache");
  
        job.setJarByClass(Driver.class);
        job.setMapperClass(Cached_Word_Count.class);
        job.setReducerClass(Cached_Reducer.class);
  
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
  
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
  
        try {
  
            // the complete URI(Uniform Resource 
            // Identifier) file path in Hdfs
            job.addCacheFile(new URI("hdfs://localhost:9000/cached_Geeks/stopWords.txt"));
        }
        catch (Exception e) {
            System.out.println("File Not Added");
            System.exit(1);
        }
  
        FileInputFormat.addInputPath(job, new Path(args[0]));
  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
        // throws ClassNotFoundException, so handle it
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    }
}

如何执行代码?

  1. 将项目导出为 jar 文件并复制到您的 Ubuntu 桌面作为分布式示例.jar
  2. 启动您的 Hadoop 服务。进入 hadoop_home_dir 和终端类型
    sbin/start-all.sh
    
  3. 运行jar文件

    输出:

    // will print the words starting with t
    
    bin/hdfs dfs -cat /geeksOutput/part* | grep ^t
    

    在输出中,我们可以观察到存在或这是我们想忽略的话。