📜  MapReduce 程序 – 用于分析冷热天的天气数据分析

📅  最后修改于: 2021-10-27 06:31:28             🧑  作者: Mango

在这里,我们将编写一个 Map-Reduce 程序来分析天气数据集,以了解其数据处理编程模型。天气传感器正在通过大量日志数据收集全球的天气信息。这种天气数据是半结构化的,以记录为导向。

此数据以面向行的 ASCII 格式存储,其中每一行代表一条记录。每行都有很多字段,如经度、纬度、每日最高-最低温度、每日平均温度等,为方便起见,我们将重点关注主要元素,即温度。我们将使用来自国家环境信息中心 (NCEI) 的数据。它拥有大量的历史天气数据,我们可以将其用于数据分析。

问题陈述:

Analyzing weather data of Fairbanks, Alaska to find cold and hot days using MapReduce Hadoop.

第1步:

我们可以从这个链接下载数据集,对于不同年份的不同城市。选择您选择的年份并选择任何一个数据文本文件进行分析。就我而言,我选择了CRND0103-2020-AK_Fairbanks_11_NE.txt数据集来分析阿拉斯加费尔班克斯的冷热天。

我们可以从 NCEI 网站上提供的README.txt文件中获取有关数据的信息。

第2步:

下面是我们的数据集示例,其中第 6 列和第 7 列分别显示最高和最低温度。

数据集中的最小和最大温度场

第 3 步:

在 Eclipse 中创建一个项目,步骤如下:

  • 首先打开Eclipse -> 然后选择File -> New -> Java Project -> Name it MyProject -> 然后选择use an execution environment -> 选择JavaSE-1.8然后next -> Finish

    创建-java-project

  • 在此项目中创建名为MyMaxMin 的Java类 -> 然后单击完成
    创建 Java 类
  • 将下面的源代码复制到这个MyMaxMin Java类
    // importing Libraries 
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.conf.Configuration;
      
    public class MyMaxMin {
      
          
        // Mapper
          
        /*MaxTemperatureMapper class is static
         * and extends Mapper abstract class 
         * having four Hadoop generics type 
         * LongWritable, Text, Text, Text.
        */
          
          
        public static class MaxTemperatureMapper extends
                Mapper {
              
            /**
            * @method map
            * This method takes the input as a text data type.
            * Now leaving the first five tokens, it takes 
            * 6th token is taken as temp_max and
            * 7th token is taken as temp_min. Now 
            * temp_max > 30 and temp_min < 15 are
            * passed to the reducer.
            */
      
        // the data in our data set with
        // this value is inconsistent data
        public static final int MISSING = 9999;
              
        @Override
            public void map(LongWritable arg0, Text Value, Context context)
                    throws IOException, InterruptedException {
      
            // Convert the single row(Record) to 
            // String and store it in String
            // variable name line
                  
            String line = Value.toString();
                  
                // Check for the empty line
                if (!(line.length() == 0)) {
                      
                    // from character 6 to 14 we have
                    // the date in our dataset
                    String date = line.substring(6, 14);
      
                    // similarly we have taken the maximum 
                    // temperature from 39 to 45 characters
                    float temp_Max = Float.parseFloat(line.substring(39, 45).trim());
                      
                    // similarly we have taken the minimum 
                    // temperature from 47 to 53 characters
                      
                    float temp_Min = Float.parseFloat(line.substring(47, 53).trim());
      
                    // if maximum temperature is
                    // greater than 30, it is a hot day
                    if (temp_Max > 30.0) {
                          
                        // Hot day
                        context.write(new Text("The Day is Hot Day :" + date),
                                             new Text(String.valueOf(temp_Max)));
                    }
      
                    // if the minimum temperature is 
                    // less than 15, it is a cold day
                    if (temp_Min < 15) {
                          
                        // Cold day
                        context.write(new Text("The Day is Cold Day :" + date),
                                new Text(String.valueOf(temp_Min)));
                    }
                }
            }
      
        }
      
    // Reducer
          
        /*MaxTemperatureReducer class is static 
          and extends Reducer abstract class
          having four Hadoop generics type 
          Text, Text, Text, Text.
        */
          
        public static class MaxTemperatureReducer extends
                Reducer {
      
            /**
            * @method reduce
            * This method takes the input as key and
            * list of values pair from the mapper, 
            * it does aggregation based on keys and 
            * produces the final context.
            */
              
            public void reduce(Text Key, Iterator Values, Context context)
                    throws IOException, InterruptedException {
      
                  
                // putting all the values in 
                // temperature variable of type String
                String temperature = Values.next().toString();
                context.write(Key, new Text(temperature));
            }
      
        }
      
      
      
        /**
        * @method main
        * This method is used for setting
        * all the configuration properties.
        * It acts as a driver for map-reduce
        * code.
        */
          
        public static void main(String[] args) throws Exception {
      
            // reads the default configuration of the 
            // cluster from the configuration XML files
            Configuration conf = new Configuration();
              
            // Initializing the job with the 
            // default configuration of the cluster     
            Job job = new Job(conf, "weather example");
              
            // Assigning the driver class name
            job.setJarByClass(MyMaxMin.class);
      
            // Key type coming out of mapper
            job.setMapOutputKeyClass(Text.class);
              
            // value type coming out of mapper
            job.setMapOutputValueClass(Text.class);
      
            // Defining the mapper class name
            job.setMapperClass(MaxTemperatureMapper.class);
              
            // Defining the reducer class name
            job.setReducerClass(MaxTemperatureReducer.class);
      
            // Defining input Format class which is 
            // responsible to parse the dataset 
            // into a key value pair
            job.setInputFormatClass(TextInputFormat.class);
              
            // Defining output Format class which is
            // responsible to parse the dataset
            // into a key value pair
            job.setOutputFormatClass(TextOutputFormat.class);
      
            // setting the second argument
            // as a path in a path variable
            Path OutputPath = new Path(args[1]);
      
            // Configuring the input path
            // from the filesystem into the job
            FileInputFormat.addInputPath(job, new Path(args[0]));
      
            // Configuring the output path from 
            // the filesystem into the job
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
            // deleting the context path automatically
            // from hdfs so that we don't have 
            // to delete it explicitly
            OutputPath.getFileSystem(conf).delete(OutputPath);
      
            // exiting the job only if the
            // flag value becomes false
            System.exit(job.waitForCompletion(true) ? 0 : 1);
      
        }
    }
    
  • 现在我们需要为我们导入的包添加外部 jar。根据你的Hadoop版本下载Hadoop Common和Hadoop MapReduce Core的jar包。

    您可以检查 Hadoop 版本:

    hadoop version

    检查hadoop版本

  • 现在我们将这些外部 jars 添加到我们的MyProject 中。右键单击MyProject -> 然后选择Build Path -> 单击Configue Build Path并选择Add External jars…。并从它的下载位置添加 jars 然后单击 -> Apply and Close

    将外部 jar 文件添加到我们的项目

  • 现在将项目导出为 jar 文件。右键单击MyProject选择Export..并转到Java -> JAR 文件单击 -> Next并选择导出目标,然后单击 -> Next
    通过单击 -> Browse选择 Main Class 作为MyMaxMin ,然后单击 -> Finish -> Ok

    导出-java-MyProject

    选择主类

第四步:

启动我们的 Hadoop 守护进程

start-dfs.sh
start-yarn.sh

第 5 步:

将您的数据集移动到 Hadoop HDFS。

句法:

hdfs dfs -put /file_path /destination

在下面的命令中/显示了我们 HDFS 的根目录。

hdfs dfs -put /home/dikshant/Downloads/CRND0103-2020-AK_Fairbanks_11_NE.txt /

检查发送到我们 HDFS 的文件。

hdfs dfs -ls /

将数据集复制到我们的 HDFS

第 6 步:

现在使用以下命令运行您的 Jar 文件并在MyOutput文件中生成输出。

句法:

hadoop jar /jar_file_location /dataset_location_in_HDFS /output-file_name

命令:

hadoop jar /home/dikshant/Documents/Project.jar /CRND0103-2020-AK_Fairbanks_11_NE.txt /MyOutput

运行我们的 jar 文件进行分析

第 7 步:

现在移至localhost:50070/ ,在实用程序下选择Browse the file system并在/MyOutput目录中下载part-r-00000以查看结果。

hdfs-view-1

hdfs-view-2

第 8 步:

在下载的文件中查看结果。

获得的前 10 名结果

在上图中,您可以看到显示寒冷天气的前 10 个结果。第二列是 yyyy/mm/dd 格式的一天。例如, 20200101表示

year = 2020
month = 01
Date = 01