将 PySpark 行列表转换为 Pandas DataFrame
在本文中,我们将 PySpark 行列表转换为 Pandas 数据框。 Row 对象被定义为 PySpark DataFrame 中的单个 Row。因此,数据框可以很容易地表示为行对象的Python列表。
方法一:使用createDataFrame()方法,使用toPandas()方法
下面是 createDataFrame() 方法的语法:
Syntax : current_session.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
Parameters :
- data : a resilient distributed dataset or data in form of MySQL/SQL datatypes
- schema : string or list of columns names for the DataFrame.
- samplingRatio -> float: a sample ratio of the rows
- verifySchema -> bool: check if the datatypes of the rows is as specified in the schema
Returns : PySpark DataFrame object.
示例:
在这个例子中,我们将 Row 列表作为数据传递并创建一个 PySpark DataFrame。然后我们将使用 toPandas() 方法获取 Pandas DataFrame。
Python
# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
'row_pandas_session'
).getOrCreate()
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
Row(Topic='Arrays', Difficulty=5),
Row(Topic='Sorting', Difficulty=6),
Row(Topic='Binary Search', Difficulty=7)]
# creating PySpark DataFrame using createDataFrame()
df = row_pandas_session.createDataFrame(row_object_list)
# Printing the Spark DataFrame
df.show()
# Conversion to Pandas DataFrame
pandas_df = df.toPandas()
# Final Result
print(pandas_df)
Python
# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
'row_pandas_session'
).getOrCreate()
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
Row(Topic='Arrays', Difficulty=5),
Row(Topic='Sorting', Difficulty=6),
Row(Topic='Binary Search', Difficulty=7)]
# Creating an RDD
rdd = row_pandas_session.sparkContext.parallelize(row_object_list)
# DataFrame created using RDD
df = row_pandas_session.createDataFrame(rdd)
# Checking the DataFrame
df.show()
# Conversion of DataFrame
df2 = df.toPandas()
# Final DataFrame needed
print(df2)
Python
# Importing PySpark
# Importing Pandas for append()
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import Row
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
'row_pandas_session'
).getOrCreate()
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
Row(Topic='Arrays', Difficulty=5),
Row(Topic='Sorting', Difficulty=6),
Row(Topic='Binary Search', Difficulty=7)]
# Our final DataFrame initialized
mega_df = pandas.DataFrame()
# Traversing through the list
for i in range(len(row_object_list)):
# Creating a Spark DataFrame of a single row
small_df = row_pandas_session.createDataFrame([row_object_list[i]])
# appending the Pandas version of small_df
# to mega_df
mega_df = mega_df.append(small_df.toPandas(),
ignore_index=True)
# Printing our desired DataFrame
print(mega_df)
输出:
方法2:使用parallelize()
我们将使用 parallelize() 创建一个 RDD。并行化意味着将存在于预定义集合中的元素复制到我们可以并行操作的分布式数据集。这是parallelize()的语法:
Syntax : sc.parallelize(data,numSlices)
sc : Spark Context Object
Parameters :
- data : data for which RDD is to be made.
- numSlices : number of partitions that need to be made. This is an optional parameter.
示例:
在此示例中,我们将使用createDataFrame()创建 PySpark DataFrame,然后使用toPandas()获取 Pandas DataFrame。
Python
# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
'row_pandas_session'
).getOrCreate()
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
Row(Topic='Arrays', Difficulty=5),
Row(Topic='Sorting', Difficulty=6),
Row(Topic='Binary Search', Difficulty=7)]
# Creating an RDD
rdd = row_pandas_session.sparkContext.parallelize(row_object_list)
# DataFrame created using RDD
df = row_pandas_session.createDataFrame(rdd)
# Checking the DataFrame
df.show()
# Conversion of DataFrame
df2 = df.toPandas()
# Final DataFrame needed
print(df2)
输出:
方法3:遍历Row list
在此方法中,我们将遍历 Row 列表,并使用createDataFrame() 将每个行对象转换为 DataFrame。然后,我们将 append() 这个 DataFrame 到一个累积的最终 DataFrame 中,这将是我们的最终答案。 append()的详细信息如下:
Syntax: df.append(other, ignore_index=False, verify_integrity=False, sort=None)
df : Pandas DataFrame
Parameters :
- other : Pandas DataFrame, Numpy Array, Numpy Series etc.
- ignore_index : Checks if index labels are to be used or not.
- verify_integrity : If True, raise ValueError on creating index with duplicates.
- sort : Sort columns if the columns of df and other are unaligned.
Returns: A new appended DataFrame
示例:
在此示例中,我们将使用 createDataFrame() 创建 PySpark DataFrame,然后使用 append() 获取 Pandas DataFrame。
Python
# Importing PySpark
# Importing Pandas for append()
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import Row
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
'row_pandas_session'
).getOrCreate()
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
Row(Topic='Arrays', Difficulty=5),
Row(Topic='Sorting', Difficulty=6),
Row(Topic='Binary Search', Difficulty=7)]
# Our final DataFrame initialized
mega_df = pandas.DataFrame()
# Traversing through the list
for i in range(len(row_object_list)):
# Creating a Spark DataFrame of a single row
small_df = row_pandas_session.createDataFrame([row_object_list[i]])
# appending the Pandas version of small_df
# to mega_df
mega_df = mega_df.append(small_df.toPandas(),
ignore_index=True)
# Printing our desired DataFrame
print(mega_df)
输出 :