📜  将 PySpark 行列表转换为 Pandas DataFrame

📅  最后修改于: 2022-05-13 01:55:00.715000             🧑  作者: Mango

将 PySpark 行列表转换为 Pandas DataFrame

在本文中,我们将 PySpark 行列表转换为 Pandas 数据框。 Row 对象被定义为 PySpark DataFrame 中的单个 Row。因此,数据框可以很容易地表示为行对象的Python列表。

方法一:使用createDataFrame()方法,使用toPandas()方法

下面是 createDataFrame() 方法的语法:

示例

在这个例子中,我们将 Row 列表作为数据传递并创建一个 PySpark DataFrame。然后我们将使用 toPandas() 方法获取 Pandas DataFrame。

Python
# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# creating PySpark DataFrame using createDataFrame()
df = row_pandas_session.createDataFrame(row_object_list)
 
# Printing the Spark DataFrame
df.show()
 
# Conversion to Pandas DataFrame
pandas_df = df.toPandas()
 
# Final Result
print(pandas_df)


Python
# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Creating an RDD
rdd = row_pandas_session.sparkContext.parallelize(row_object_list)
 
# DataFrame created using RDD
df = row_pandas_session.createDataFrame(rdd)
 
# Checking the DataFrame
df.show()
 
# Conversion of DataFrame
df2 = df.toPandas()
 
# Final DataFrame needed
print(df2)


Python
# Importing PySpark
# Importing Pandas for append()
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Our final DataFrame initialized
mega_df = pandas.DataFrame()
 
# Traversing through the list
for i in range(len(row_object_list)):
   
    # Creating a Spark DataFrame of a single row
    small_df = row_pandas_session.createDataFrame([row_object_list[i]])
 
    # appending the Pandas version of small_df
    # to mega_df
    mega_df = mega_df.append(small_df.toPandas(),
                             ignore_index=True)
 
# Printing our desired DataFrame
print(mega_df)


输出

方法2:使用parallelize()

我们将使用 parallelize() 创建一个 RDD。并行化意味着将存在于预定义集合中的元素复制到我们可以并行操作的分布式数据集。这是parallelize()的语法:

示例

在此示例中,我们将使用createDataFrame()创建 PySpark DataFrame,然后使用toPandas()获取 Pandas DataFrame。

Python

# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Creating an RDD
rdd = row_pandas_session.sparkContext.parallelize(row_object_list)
 
# DataFrame created using RDD
df = row_pandas_session.createDataFrame(rdd)
 
# Checking the DataFrame
df.show()
 
# Conversion of DataFrame
df2 = df.toPandas()
 
# Final DataFrame needed
print(df2)

输出

方法3:遍历Row list

在此方法中,我们将遍历 Row 列表,并使用createDataFrame() 将每个行对象转换为 DataFrame。然后,我们将 append() 这个 DataFrame 到一个累积的最终 DataFrame 中,这将是我们的最终答案。 append()的详细信息如下:

示例

在此示例中,我们将使用 createDataFrame() 创建 PySpark DataFrame,然后使用 append() 获取 Pandas DataFrame。

Python

# Importing PySpark
# Importing Pandas for append()
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Our final DataFrame initialized
mega_df = pandas.DataFrame()
 
# Traversing through the list
for i in range(len(row_object_list)):
   
    # Creating a Spark DataFrame of a single row
    small_df = row_pandas_session.createDataFrame([row_object_list[i]])
 
    # appending the Pandas version of small_df
    # to mega_df
    mega_df = mega_df.append(small_df.toPandas(),
                             ignore_index=True)
 
# Printing our desired DataFrame
print(mega_df)

输出 :