将数据附加到 PySpark 中的空数据框
在本文中,我们将了解如何使用Python编程语言将数据附加到 PySpark 中的空 DataFrame。
方法一:做一个空的DataFrame,和一个非空的具有相同schema的DataFrame做一个union
union()函数对于这个操作是最重要的。它用于混合具有相同列模式的两个 DataFrame。
Syntax : FirstDataFrame.union(Second DataFrame)
Returns : DataFrame with rows of both DataFrames.
例子:
在这个例子中,我们创建了一个具有特定模式的 DataFrame,并且 data 创建了一个具有相同方案的 EMPTY DataFrame,并使用Python语言中的 union()函数对这两个 DataFrame 进行联合。
Python
# Importing PySpark and the SparkSession
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a spark session
spark_session = SparkSession.builder.appName(
'Spark_Session').getOrCreate()
# Creating an empty RDD to make a
# DataFrame with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns1 = StructType([StructField('Name', StringType(), False),
StructField('Salary', IntegerType(), False)])
# Creating an empty DataFrame
first_df = spark_session.createDataFrame(data=emp_RDD,
schema=columns1)
# Printing the DataFrame with no data
first_df.show()
# Hardcoded data for the second DataFrame
rows = [['Ajay', 56000], ['Srikanth', 89078],
['Reddy', 76890], ['Gursaidutt', 98023]]
columns = ['Name', 'Salary']
# Creating the DataFrame
second_df = spark_session.createDataFrame(rows, columns)
# Printing the non-empty DataFrame
second_df.show()
# Storing the union of first_df and
# second_df in first_df
first_df = first_df.union(second_df)
# Our first DataFrame that was empty,
# now has data
first_df.show()
Python
# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a spark session
spark_session = SparkSession.builder.appName(
'Spark_Session').getOrCreate()
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
StructField('Capacity', IntegerType(), False)])
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
schema=columns)
# Printing the DataFrame with no data
df.show()
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
# Creating the DataFrame
added_df = spark_session.createDataFrame(added_row, columns)
# Storing the union of first_df and second_df
# in first_df
df = df.union(added_df)
# Our first DataFrame that was empty,
# now has data
df.show()
Python
# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a spark session
spark_session = SparkSession.builder.appName(
'Spark_Session').getOrCreate()
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
StructField('Capacity', IntegerType(), False)])
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
schema=columns)
# Printing the DataFrame with no data
df.show()
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
# Creating the DataFrame whose data
# needs to be added
added_df = spark_session.createDataFrame(added_row,
columns)
# converting our PySpark DataFrames to
# Pandas DataFrames
pandas_added = added_df.toPandas()
df = df.toPandas()
# using append() function to add the data
df = df.append(pandas_added, ignore_index=True)
# reconverting our DataFrame back
# to a PySpark DataFrame
df = spark_session.createDataFrame(df)
# Printing resultant DataFrame
df.show()
输出 :
+----+------+
|Name|Salary|
+----+------+
+----+------+
+----------+------+
| Name|Salary|
+----------+------+
| Ajay| 56000|
| Srikanth| 89078|
| Reddy| 76890|
|Gursaidutt| 98023|
+----------+------+
+----------+------+
| Name|Salary|
+----------+------+
| Ajay| 56000|
| Srikanth| 89078|
| Reddy| 76890|
|Gursaidutt| 98023|
+----------+------+
方法2:通过将行转换为DataFrame,将单行添加到空DataFrame
我们可以使用createDataFrame()以Python列表的形式转换单行。 createDataFrame() 的详细信息是:
Syntax : CurrentSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
Parameters :
data :
- schema : str/list , optional: Contains a String or List of column names.
- samplingRatio : float, optional: A sample of rows for inference
- verifySchema : bool, optional: Verify data types of every row against the specified schema. The value is True by default.
例子:
在这个例子中,我们创建一个具有特定模式和单行的 DataFrame,并使用createDataFrame()创建一个具有相同模式的 EMPTY DataFrame,使用 union()函数将这两个 DataFrame 合并,进一步将上述结果存储在前面的空中DataFrame 并使用show()查看更改。
Python
# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a spark session
spark_session = SparkSession.builder.appName(
'Spark_Session').getOrCreate()
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
StructField('Capacity', IntegerType(), False)])
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
schema=columns)
# Printing the DataFrame with no data
df.show()
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
# Creating the DataFrame
added_df = spark_session.createDataFrame(added_row, columns)
# Storing the union of first_df and second_df
# in first_df
df = df.union(added_df)
# Our first DataFrame that was empty,
# now has data
df.show()
输出 :
+-------+--------+
|Stadium|Capacity|
+-------+--------+
+-------+--------+
+--------------+--------+
| Stadium|Capacity|
+--------------+--------+
|Motera Stadium| 132000|
+--------------+--------+
方法 3:将空 DataFrame 转换为 Pandas DataFrame 并使用 append()函数
我们将使用toPandas()将 PySpark DataFrame 转换为 Pandas DataFrame。它的语法是:
Syntax : PySparkDataFrame.toPandas()
Returns : Corresponding Pandas DataFrame
然后我们将使用 Pandas 的append()函数。它的语法是:
Syntax : PandasDataFrame.append(other, ignore_index=False, verify_integrity=False, sort=False)
Parameters :
- other : Pandas DataFrame, Numpy Series etc: The data that has to be appended.
- ignore_index : bool: If indexed a ignored then the indexes of the new DataFrame will have no relations to the older ones.
- sort : bool: Sort the columns if alignment of the columns in other and PandasDataFrame is different.
例子:
这里我们创建一个要添加数据的空 DataFrame,然后我们使用createDataFrame()将要添加的数据转换为 Spark DataFrame,并使用toPandas()进一步将两个 DataFrame 转换为 Pandas DataFrame,并使用append()函数将非空数据帧添加到空数据帧并忽略索引,因为我们正在获得一个新的数据帧。最后,我们使用createDataFrame() 将最终的 Pandas 数据帧转换为 Spark 数据帧。
Python
# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a spark session
spark_session = SparkSession.builder.appName(
'Spark_Session').getOrCreate()
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
StructField('Capacity', IntegerType(), False)])
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
schema=columns)
# Printing the DataFrame with no data
df.show()
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
# Creating the DataFrame whose data
# needs to be added
added_df = spark_session.createDataFrame(added_row,
columns)
# converting our PySpark DataFrames to
# Pandas DataFrames
pandas_added = added_df.toPandas()
df = df.toPandas()
# using append() function to add the data
df = df.append(pandas_added, ignore_index=True)
# reconverting our DataFrame back
# to a PySpark DataFrame
df = spark_session.createDataFrame(df)
# Printing resultant DataFrame
df.show()
输出 :
+-------+--------+
|Stadium|Capacity|
+-------+--------+
+-------+--------+
+--------------+--------+
| Stadium|Capacity|
+--------------+--------+
|Motera Stadium| 132000|
+--------------+--------+