📅  最后修改于: 2023-12-03 15:41:42.920000             🧑  作者: Mango
Spark SQL是Apache Spark提供的用于处理结构化数据的模块。它提供了一种高级的数据处理API,称为DataFrame,以及一个SQL查询接口。
DataFrame是一种具有透明优化的分布式数据集合,可以在各种数据源上进行操作。它与关系数据库中的表非常相似。可以使用DataFrame API来对数据进行过滤、选择、聚合等操作。
可以通过从不同的数据源加载数据来创建DataFrame。常见的数据源有:
以下是通过从本地文件加载数据创建DataFrame的示例代码:
val spark = SparkSession.builder().appName("MyApp").master("local[*]").getOrCreate()
val data = spark.read.option("header", "true").csv("path/to/file.csv")
val filteredData = data.filter(col("age") > 18)
filteredData.show()
DataFrame API是一套用于操作DataFrame的方法。其中一些常用的方法包括:
filter()
:对数据进行过滤。select()
:选择数据的一部分列。groupBy()
:对数据进行分组。agg()
:对每个组进行聚合操作。join()
:将两个DataFrame连接起来。以下是使用DataFrame API进行聚合操作的示例代码:
val spark = SparkSession.builder().appName("MyApp").master("local[*]").getOrCreate()
val data = List((1, "a", 10), (1, "b", 20), (2, "a", 30), (2, "b", 40)).toDF("id", "col1", "col2")
val result = data.groupBy("id").agg(sum("col2"))
result.show()
除了DataFrame API之外,Spark SQL还提供了一个SQL查询接口。可以使用标准的SQL语法进行查询。以下是使用SQL查询从Hive表中选择数据的示例代码:
val spark = SparkSession.builder().appName("MyApp").master("local[*]").getOrCreate()
spark.sql("SELECT * FROM my_table")
在使用SQL查询之前,需要将DataFrame注册为Spark SQL表。可以使用createOrReplaceTempView()
方法将DataFrame注册为Spark SQL表。以下是将DataFrame注册为Spark SQL表的示例代码:
val spark = SparkSession.builder().appName("MyApp").master("local[*]").getOrCreate()
val data = List((1, "a", 10), (1, "b", 20), (2, "a", 30), (2, "b", 40)).toDF("id", "col1", "col2")
data.createOrReplaceTempView("my_table")
spark.sql("SELECT * FROM my_table")
Spark SQL是一个非常强大的模块,可以让我们像使用关系型数据库一样操作分布式数据。使用Spark SQL,我们可以轻松地从不同的数据源加载数据,并用DataFrame API或SQL查询对数据进行操作。