📜  Apache Flink-创建Flink应用程序

📅  最后修改于: 2020-10-30 10:09:37             🧑  作者: Mango


在本章中,我们将学习如何创建Flink应用程序。

打开Eclipse IDE,单击“新建项目”,然后选择“ Java项目”。

创建Flink应用程序

输入项目名称,然后单击完成。

创建Flink应用程序2

现在,单击完成,如以下屏幕截图所示。

创建Flink应用程序3

现在,右键单击src并转到New >> Class。

创建Flink应用程序4

输入课程名称,然后单击完成。

创建Flink应用程序5

将以下代码复制并粘贴到编辑器中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet text = env.readTextFile(params.get("input"));
      DataSet> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction> {
      public void flatMap(String value, Collector> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

在编辑器中会出现很多错误,因为Flink库需要添加到该项目中。

Flink库已添加

右键单击项目>>构建路径>>配置构建路径。

右键单击项目

选择库选项卡,然后单击添加外部JAR。

选择图书馆

转到Flink的lib目录,选择所有4个库,然后单击OK。

Flinks lib目录

转到“订购和导出”选项卡,选择所有库,然后单击“确定”。

订单和导出标签

您将看到错误不再存在。

现在,让我们导出此应用程序。右键单击项目,然后单击导出。

导出此应用程序

选择JAR文件,然后单击下一步。

选择JAR文件

提供目标路径,然后单击下一步

目标路径

点击下一步>

点击下一步

单击浏览,选择主类(WordCount),然后单击完成。

点击完成

注意-如果收到任何警告,单击“确定”。

运行以下命令。它将进一步运行您刚创建的Flink应用程序。

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output

得到警告