📜  Apache Flink-API概念(1)

📅  最后修改于: 2023-12-03 14:39:16.261000             🧑  作者: Mango

Apache Flink API概念

Apache Flink是一个高性能、分布式、可扩展的数据流处理框架。它可以处理和分析无限量的数据,支持数据流处理和批处理两种模式。

Flink API

Flink提供了三种用于处理数据的API:

  • DataSet API: 适用于有限数据集的批处理API。
  • DataStream API: 适用于无限数据流的实时处理API。
  • Table API & SQL:可以通过类似关系型数据库的语法创造类似于SQL的语法的语言,适合无论是批处理还是流处理,对于对数据的简单的处理,可以非常简单的实现。
DataSet API

DataSet API通过提供各种操作符来处理有限数据集。这些操作符可以与Java、Scala和Python编程语言一起使用。

下面是一个简单的DataSet API示例,演示了如何读取并处理文本文件:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class WordCount {

    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("/path/to/your/file")
                                  .flatMap(new LineSplitter());

        DataSet<Tuple2<String, Integer>> counts = text.groupBy(0).sum(1);

        counts.print();
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
DataStream API

DataStream API通过提供各种操作符来处理无限数据流。这些操作符可以与Java、Scala一起使用。

下面是一个简单的DataStream API示例,演示了如何读取并处理套接字流:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.socket.SocketTextStreamFunction;
import org.apache.flink.util.Collector;

public class SocketTextStreamWordCount {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.socketTextStream("hostname", 9999);

        DataStream<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter())
                                                       .keyBy(0)
                                                       .sum(1);

        counts.print();

        env.execute("Socket TextStream WordCount");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
Table API & SQL

Table API & SQL提供了一种与传统关系型数据库相似的API和查询语言,它可以查询无限量的数据流和有限数据集。

下面是一个简单的Table API & SQL示例,用于查询数据流中不同单词的计数:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class WordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
            env,
            EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        );

        tableEnv.createTemporaryView(
            "words",
            env.fromElements("Hello", "Flink", "Hello", "TableAPI", "TableAPI", "SQL")
                .map(s -> Row.of(s)),
                TableSchema.builder().field("word", DataTypes.STRING()).build()
        );

        Table results = tableEnv.sqlQuery("SELECT word, COUNT(*) FROM words GROUP BY word");

        results.toRetractStream(Row.class).print();

        env.execute("Word Count");
    }
}

更多关于Flink API的信息可以在Flink官方网站上找到。