📅  最后修改于: 2020-11-29 08:01:31             🧑  作者: Mango
DataFrame是数据的分布式集合,被组织为命名列。从概念上讲,它等效于具有良好优化技术的关系表。
可以从不同来源的数组(例如Hive表,结构化数据文件,外部数据库或现有RDD)构造DataFrame。该API是为现代大数据和数据科学应用程序设计的,灵感来自R编程中的DataFrame和Python的Pandas 。
这是DataFrame的一些特点-
能够在单个节点群集到大型群集上处理千字节到PB大小的数据。
支持不同的数据格式(Avro,csv,弹性搜索和Cassandra)和存储系统(HDFS,HIVE表,mysql等)。
通过Spark SQL Catalyst优化器(树转换框架)进行最先进的优化和代码生成。
可以通过Spark-Core轻松地与所有大数据工具和框架集成。
提供适用于Python,Java,Scala和R编程的API。
SQLContext是一个类,用于初始化Spark SQL的功能。初始化SQLContext类对象需要SparkContext类对象(sc)。
以下命令用于通过spark-shell初始化SparkContext。
$ spark-shell
默认情况下,spark-shell启动时,SparkContext对象将使用名称sc初始化。
使用以下命令创建SQLContext。
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
让我们考虑一个名为employee.json的JSON文件中的员工记录示例。使用以下命令创建DataFrame(df),并读取具有以下内容的名为employee.json的JSON文档。
employee.json-将文件放置在当前scala>指针所在的目录中。
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
DataFrame为结构化数据操作提供了特定于域的语言。在这里,我们包括一些使用DataFrames进行结构化数据处理的基本示例。
请按照下面给出的步骤执行DataFrame操作-
首先,我们必须阅读JSON文档。基于此,生成一个名为(dfs)的DataFrame。
使用以下命令来读取名为employee.json的JSON文档。数据显示为带有字段-ID,名称和年龄的表格。
scala> val dfs = sqlContext.read.json("employee.json")
输出-字段名称是自动从employee.json获取的。
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
如果要查看DataFrame中的数据,请使用以下命令。
scala> dfs.show()
输出-您可以以表格格式查看员工数据。
:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
如果要查看DataFrame的结构(架构),请使用以下命令。
scala> dfs.printSchema()
输出
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
使用以下命令从DataFrame的三列中获取名称-column。
scala> dfs.select("name").show()
输出-您可以看到名称列的值。
:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
使用以下命令查找年龄大于23(年龄> 23)的员工。
scala> dfs.filter(dfs("age") > 23).show()
输出
:22, took 0.078670 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
+----+------+--------+
使用以下命令对相同年龄的员工数进行计数。
scala> dfs.groupBy("age").count().show()
输出-两名雇员的年龄为23岁。
:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 | 2 |
| 25 | 1 |
| 28 | 1 |
| 39 | 1 |
+----+-----+
SQLContext使应用程序可以在运行SQL函数的同时以编程方式运行SQL查询,并将结果作为DataFrame返回。
通常,在后台,SparkSQL支持两种不同的方法将现有的RDD转换为DataFrames-
Sr. No | Methods & Description |
---|---|
1 | Inferring the Schema using Reflection
This method uses reflection to generate the schema of an RDD that contains specific types of objects. |
2 | Programmatically Specifying the Schema
The second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD. |