📜  Spark SQL教程(1)

📅  最后修改于: 2023-12-03 14:47:31.499000             🧑  作者: Mango

Spark SQL教程

Spark SQL是用于处理结构化数据的Spark模块。它提供了许多功能,包括将数据加载到分布式内存中进行查询,将查询结果导出到不同格式的文件中,通过JDBC / ODBC与传统的大数据存储系统进行交互,等等。在这个教程中,我们将介绍Spark SQL的一些基础知识以及如何使用它来处理和分析数据。

安装Spark SQL

首先,我们需要下载并安装Apache Spark。您可以从官方网站[https://spark.apache.org/downloads.html]下载最新版本。安装完成后,您需要启动Spark Shell以开始使用Spark SQL。在终端中,输入以下命令来启动Spark Shell:

$ spark-shell
创建DataFrame

Spark SQL最重要的概念之一是DataFrame,它是一种由行和列组成的分布式数据集。可以将DataFrame视为关系数据库中的表。我们可以从CSV文件,Hive表或RDD来创建DataFrame。

从CSV文件创建DataFrame

以下是从CSV文件创建DataFrame的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("path/to/csv/file.csv")

在这个示例中,我们使用Spark的CSV数据源来读取CSV文件。我们使用“header”选项指示我们的CSV文件包含标题,使用“inferSchema”选项来自动推断列中的数据类型。

从Hive表创建DataFrame

以下是从Hive表创建DataFrame的示例:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val df = hiveContext.table("database.table")

在这个示例中,我们使用HiveContext来连接到Hive metastore,然后读取以前创建的Hive表。

从RDD创建DataFrame

以下是从RDD创建DataFrame的示例:

val rdd = spark.sparkContext.parallelize(Seq(
  (0, "John", "United States"),
  (1, "Alex", "Canada"),
  (2, "Bob", "United Kingdom")
))
val df = rdd.toDF("id", "name", "country")

在这个示例中,我们创建了一个包含三个元组的RDD,然后将其转换为DataFrame。我们手动指定了列名称,即id,name和country。

DataFrame的基本操作

我们可以使用DataFrame执行许多基本操作,例如选择,过滤,聚合,排序等。

选择列

以下是选择列的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.select("column1", "column2")

在这个示例中,我们使用select方法选择两列column1和column2。

过滤行

以下是过滤行的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.filter("column1 > 10")

在这个示例中,我们使用filter方法过滤出列column1大于10的行。

聚合数据

以下是聚合数据的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.groupBy("column1").agg(sum("column2"))

在这个示例中,我们使用groupBy方法指定列column1,然后使用agg方法计算列column2的总和。

排序数据

以下是排序数据的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.sort("column1")

在这个示例中,我们使用sort方法按列column1对DataFrame进行排序。

将DataFrame的结果保存到文件中

我们可以使用DataFrame将结果保存到不同格式的文件中,例如CSV,JSON,Parquet等。

将结果保存到CSV文件中

以下是将结果保存到CSV文件中的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.write.format("csv")
  .option("header", "true")
  .save("path/to/save/file.csv")

在这个示例中,我们使用write方法和CSV数据源将DataFrame中的结果保存到CSV文件中。

将结果保存到JSON文件中

以下是将结果保存到JSON文件中的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.write.format("json")
  .save("path/to/save/file.json")

在这个示例中,我们使用write方法和JSON数据源将DataFrame中的结果保存到JSON文件中。

将结果保存到Parquet文件中

以下是将结果保存到Parquet文件中的示例:

val df = spark.read.format("csv")
  .option("header", "true")
  .load("path/to/csv/file.csv")
df.write.format("parquet")
  .save("path/to/save/file.parquet")

在这个示例中,我们使用write方法和Parquet数据源将DataFrame中的结果保存到Parquet文件中。