连接两个 PySpark 数据帧
在本文中,我们将了解如何使用Python连接两个 pyspark 数据帧。
创建数据框进行演示:
Python3
# Importing necessary libraries
from pyspark.sql import SparkSession
# Create a spark session
spark = SparkSession.builder.appName('pyspark - example join').getOrCreate()
# Create data in dataframe
data = [(('Ram'), '1991-04-01', 'M', 3000),
(('Mike'), '2000-05-19', 'M', 4000),
(('Rohini'), '1978-09-05', 'M', 4000),
(('Maria'), '1967-12-01', 'F', 4000),
(('Jenis'), '1980-02-17', 'F', 1200)]
# Column names in dataframe
columns = ["Name", "DOB", "Gender", "salary"]
# Create the spark dataframe
df1 = spark.createDataFrame(data=data, schema=columns)
# Print the dataframe
df1.show()
Python3
# Create data in dataframe
data2 = [(('Mohi'), '1991-04-01', 'M', 3000),
(('Ani'), '2000-05-19', 'F', 4300),
(('Shipta'), '1978-09-05', 'F', 4200),
(('Jessy'), '1967-12-01', 'F', 4010),
(('kanne'), '1980-02-17', 'F', 1200)]
# Column names in dataframe
columns = ["Name", "DOB", "Gender", "salary"]
# Create the spark dataframe
df2 = spark.createDataFrame(data=data, schema=columns)
# Print the dataframe
df2.show()
Python3
# union the above created dataframes
result = df1.union(df2)
# display
result.show()
Python3
# union the two dataftames by using unionByname
result1 = df1.unionByName(df2)
# display
result1.show()
Python3
import functools
# explicit function
def unionAll(dfs):
return functools.reduce(lambda df1, df2: df1.union(
df2.select(df1.columns)), dfs)
# unionAll
result3 = unionAll([df1, df2])
result3.show()
输出:
+------+----------+------+------+
| Name| DOB|Gender|salary|
+------+----------+------+------+
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
+------+----------+------+------+
创建第二个数据框进行演示:
Python3
# Create data in dataframe
data2 = [(('Mohi'), '1991-04-01', 'M', 3000),
(('Ani'), '2000-05-19', 'F', 4300),
(('Shipta'), '1978-09-05', 'F', 4200),
(('Jessy'), '1967-12-01', 'F', 4010),
(('kanne'), '1980-02-17', 'F', 1200)]
# Column names in dataframe
columns = ["Name", "DOB", "Gender", "salary"]
# Create the spark dataframe
df2 = spark.createDataFrame(data=data, schema=columns)
# Print the dataframe
df2.show()
输出:
+------+----------+------+------+
| Name| DOB|Gender|salary|
+------+----------+------+------+
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
+------+----------+------+------+
方法一:使用Union()
DataFrame 的 Union() 方法用于混合具有等效结构/模式的两个 DataFrame。
Syntax: dataframe_1.union(dataframe_2)
where,
- dataframe_1 is the first dataframe
- dataframe_2 is the second dataframe
示例:
Python3
# union the above created dataframes
result = df1.union(df2)
# display
result.show()
输出:
+------+----------+------+------+
| Name| DOB|Gender|salary|
+------+----------+------+------+
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
+------+----------+------+------+
方法 2:使用unionByName()
在 Spark 3.1 中,您可以使用 unionByName() 连接数据框轻松实现此目的
Syntax: dataframe_1.unionByName(dataframe_2)
where,
- dataframe_1 is the first dataframe
- dataframe_2 is the second dataframe
示例:
Python3
# union the two dataftames by using unionByname
result1 = df1.unionByName(df2)
# display
result1.show()
输出:
+------+----------+------+------+
| Name| DOB|Gender|salary|
+------+----------+------+------+
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
+------+----------+------+------+
方法3:使用functools
Functools 模块提供了与其他函数和可调用对象一起使用的函数,以使用或扩展它们而无需完全重写它们。
语法:
functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
where,
- df1 is the first dataframe
- df2 is the second dataframe
我们创建包含一些随机值的列 'a' 和 'b' 的数据帧,并将这三个数据帧传递给我们上面创建的方法 unionAll() 并获得结果数据帧作为输出并显示结果。
示例:
Python3
import functools
# explicit function
def unionAll(dfs):
return functools.reduce(lambda df1, df2: df1.union(
df2.select(df1.columns)), dfs)
# unionAll
result3 = unionAll([df1, df2])
result3.show()
输出:
+------+----------+------+------+
| Name| DOB|Gender|salary|
+------+----------+------+------+
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
| Ram|1991-04-01| M| 3000|
| Mike|2000-05-19| M| 4000|
|Rohini|1978-09-05| M| 4000|
| Maria|1967-12-01| F| 4000|
| Jenis|1980-02-17| F| 1200|
+------+----------+------+------+