📅  最后修改于: 2023-12-03 15:09:32.521000             🧑  作者: Mango
在 PySpark 中,RDD(Resilient Distributed Datasets)是最基本的数据处理单位。但是,由于 RDD 是强类型的数据集合,它不适合处理结构化数据,而 DataFrame 很适合处理结构化数据。因此,我们需要将 RDD 转换为 DataFrame。本文将介绍如何将 PySpark RDD 转换为 DataFrame。
在 PySpark 中,我们可以将 RDD 转换为 DataFrame。我们可以使用 toDF()
方法将 RDD 转换为 DataFrame。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("convertRDDtoDF").getOrCreate()
# 创建一个 RDD
rdd = spark.sparkContext.parallelize([(1, "John Doe", 22), (2, "Mike Smith", 33), (3, "Mary Johnson", 44)])
# 将 RDD 转换为 DataFrame
df = rdd.toDF(["id", "name", "age"])
# 显示 DataFrame
df.show()
输出:
+---+-------------+---+
| id| name|age|
+---+-------------+---+
| 1| John Doe| 22|
| 2| Mike Smith| 33|
| 3| Mary Johnson| 44|
+---+-------------+---+
如果你想更明确地指定 DataFrame 的 Schema,可以使用 createDataFrame()
方法。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = SparkSession.builder.appName("convertRDDtoDFwithSchema").getOrCreate()
# 创建一个 RDD
rdd = spark.sparkContext.parallelize([(1, "John Doe", 22), (2, "Mike Smith", 33), (3, "Mary Johnson", 44)])
# 定义 DataFrame 成员的类型
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# 将 RDD 转换为 DataFrame
df = spark.createDataFrame(rdd, schema)
# 显示 DataFrame
df.show()
输出:
+---+-------------+---+
| id| name|age|
+---+-------------+---+
| 1| John Doe| 22|
| 2| Mike Smith| 33|
| 3| Mary Johnson| 44|
+---+-------------+---+
在将 RDD 转换为 DataFrame 时,如果 RDD 中存在缺失值,则我们需要使用 map()
方法将缺失值转换为 None。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = SparkSession.builder.appName("convertRDDtoDFwithMissingValues").getOrCreate()
# 创建一个 RDD,它包含缺失值
rdd_with_missing_values = spark.sparkContext.parallelize([(1, "John Doe", None), (2, None, 33), (3, "Mary Johnson", 44)])
# 将缺失值转换为 None
rdd_with_missing_values = rdd_with_missing_values.map(lambda x: (x[0], x[1] if x[1] else None, x[2] if x[2] else None))
# 定义 DataFrame 成员的类型
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# 将 RDD 转换为 DataFrame
df = spark.createDataFrame(rdd_with_missing_values, schema)
# 显示 DataFrame
df.show()
输出:
+---+-------------+----+
| id| name| age|
+---+-------------+----+
| 1| John Doe|null|
| 2| null| 33|
| 3| Mary Johnson| 44|
+---+-------------+----+
通过本文,您学会了如何将 PySpark RDD 转换为 PySpark DataFrame。您还学会了如何处理缺失值。现在,您可以更轻松地使用 PySpark 处理结构化数据了。