如何加入 Pyspark 中的多个列?
在本文中,我们将讨论如何使用Python连接 PySpark Dataframe 中的多个列。
让我们创建第一个数据框:
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby")]
# specify column names
columns = ['ID1', 'NAME1']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
dataframe.show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"),
(3, "bobby"),
(4, "rohith"), (5, "gnanesh")]
# specify column names
columns = ['ID2', 'NAME2']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
dataframe1.show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby")]
# specify column names
columns = ['ID1', 'NAME1']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby"),
(4, "rohith"), (5, "gnanesh")]
# specify column names
columns = ['ID2', 'NAME2']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
# join based on ID and name column
dataframe.join(dataframe1, (dataframe.ID1 == dataframe1.ID2)
& (dataframe.NAME1 == dataframe1.NAME2)).show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby")]
# specify column names
columns = ['ID1', 'NAME1']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby"),
(4, "rohith"), (5, "gnanesh")]
# specify column names
columns = ['ID2', 'NAME2']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
# join based on ID and name column
dataframe.join(dataframe1, (dataframe.ID1 == dataframe1.ID2)
| (dataframe.NAME1 == dataframe1.NAME2)).show()
输出:
让我们创建第二个数据框:
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"),
(3, "bobby"),
(4, "rohith"), (5, "gnanesh")]
# specify column names
columns = ['ID2', 'NAME2']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
dataframe1.show()
输出:
我们可以使用条件运算符使用 join()函数连接多个列
Syntax: dataframe.join(dataframe1, (dataframe.column1== dataframe1.column1) & (dataframe.column2== dataframe1.column2))
where,
- dataframe is the first dataframe
- dataframe1 is the second dataframe
- column1 is the first matching column in both the dataframes
- column2 is the second matching column in both the dataframes
示例 1:PySpark 代码连接具有多列(id 和 name)的两个数据框
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby")]
# specify column names
columns = ['ID1', 'NAME1']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby"),
(4, "rohith"), (5, "gnanesh")]
# specify column names
columns = ['ID2', 'NAME2']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
# join based on ID and name column
dataframe.join(dataframe1, (dataframe.ID1 == dataframe1.ID2)
& (dataframe.NAME1 == dataframe1.NAME2)).show()
输出:
示例 2:使用 or运算符连接
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby")]
# specify column names
columns = ['ID1', 'NAME1']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data = [(1, "sravan"), (2, "ojsawi"), (3, "bobby"),
(4, "rohith"), (5, "gnanesh")]
# specify column names
columns = ['ID2', 'NAME2']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
# join based on ID and name column
dataframe.join(dataframe1, (dataframe.ID1 == dataframe1.ID2)
| (dataframe.NAME1 == dataframe1.NAME2)).show()
输出: