📜  PySpark – 合并具有不同列或模式的两个 DataFrame

📅  最后修改于: 2022-05-13 01:54:56.518000             🧑  作者: Mango

PySpark – 合并具有不同列或模式的两个 DataFrame

在本文中,我们将讨论如何在Python中的 PySpark 中合并具有不同数量的列或模式的两个数据帧。

让我们考虑第一个数据框:

为了更好的演示目的,这里我们有 3 个名为 id、name 和 address 的列。

Python3
# importing module
import pyspark
  
# import when and lit function
from pyspark.sql.functions import when, lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# display
dataframe1.show()


Python3
# importing module
import pyspark
  
# import when and lit function
from pyspark.sql.functions import when, lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# display
dataframe2.show()


Python3
# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# now see the columns of dataframe1
print(dataframe1.columns)
  
# now see the columns of dataframe2
print(dataframe2.columns)


Python3
# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform union
dataframe1.union(dataframe2).show()


Python3
# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform unionByName
dataframe1.unionByName(dataframe2).show()


Python3
# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform unionAll
dataframe1.unionAll(dataframe2).show()


输出:

让我们考虑第二个数据框

在这里,我们将创建一个包含 2 列的数据框。

Python3

# importing module
import pyspark
  
# import when and lit function
from pyspark.sql.functions import when, lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# display
dataframe2.show()

输出:

我们无法合并数据框,因为列不同,所以我们必须添加缺失的列。这里在第一个数据帧 (dataframe1) 中,列 ['ID', 'NAME', 'Address'] 和第二个数据帧 (dataframe2) 列是 ['ID','Age']。

现在我们必须将 Age 列添加到第一个数据帧中,并将 NAME 和地址添加到第二个数据帧中,我们可以使用 lit()函数来做到这一点。此函数在 pyspark.sql.functions 中可用,用于添加具有值的列。在这里,我们将使用 None 添加一个值。

语法

在哪里,

  • dataframe1 是第一个数据帧
  • dataframe2 是第二个数据帧

向两个数据框添加缺失的列

在这两个数据帧中,我们将使用上述语法将 Age 列添加到第一个数据帧,并在第二个数据帧中添加 NAME 和 Address。

最后,我们将显示两个数据框的列名。

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# now see the columns of dataframe1
print(dataframe1.columns)
  
# now see the columns of dataframe2
print(dataframe2.columns)

输出:

['ID', 'NAME', 'Address', 'Age']
['ID', 'Age', 'NAME', 'Address']

合并数据框

方法一:使用 union()

这将根据位置合并数据帧。

句法:

dataframe1.union(dataframe2)

例子:

在此示例中,我们将在将所需列添加到两个数据帧后使用 union() 方法合并两个数据帧。最后,我们正在显示合并的数据框。

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform union
dataframe1.union(dataframe2).show()

输出:

方法 2:使用 unionByName()

这将根据列名合并两个数据框。

句法:

dataframe1.unionByName(dataframe2)

例子:

在此示例中,我们将在将所需列添加到两个数据帧后,使用 unionByName() 方法合并两个数据帧。最后,我们正在显示合并的数据框。

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform unionByName
dataframe1.unionByName(dataframe2).show()

输出:

方法 3:使用 unionAll()

语法

dataframe1.unionAll(dataframe2)

例子:

在此示例中,我们将在将所需列添加到两个数据帧后,使用 unionAll() 方法合并两个数据帧。最后,我们正在显示合并的数据框。

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform unionAll
dataframe1.unionAll(dataframe2).show()

输出: