PySpark 连接类型——连接两个 DataFrame
在本文中,我们将了解如何使用Python在 Pyspark 中连接两个数据帧。 Join 用于根据数据帧中的列组合两个或多个数据帧。
Syntax: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”type”)
where,
- dataframe1 is the first dataframe
- dataframe2 is the second dataframe
- column_name is the column which are matching in both the dataframes
- type is the join type we have to join
创建第一个数据框进行演示:
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
dataframe.show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
dataframe1.show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# inner join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"inner").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# full outer join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"outer").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# full outer join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"full").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# full outer join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"fullouter").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# left join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"left").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# left join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"leftouter").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# right join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"right").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# right join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"rightouter").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# leftsemi join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"leftsemi").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# leftanti join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"leftanti").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# create a view for dataframe named student
dataframe.createOrReplaceTempView("student")
# create a view for dataframe1 named department
dataframe1.createOrReplaceTempView("department")
#use sql expression to select ID column
spark.sql(
"select * from student, department\
where student.ID == department.ID").show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# create a view for dataframe named student
dataframe.createOrReplaceTempView("student")
# create a view for dataframe1 named department
dataframe1.createOrReplaceTempView("department")
# inner join on id column using sql expression
spark.sql(
"select * from student INNER JOIN \
department on student.ID == department.ID").show()
输出:
创建第二个数据框进行演示:
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
dataframe1.show()
输出:
内部联接
这将在关键列上加入两个 PySpark 数据帧,这在两个数据帧中都很常见。
Syntax: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”inner”)
例子:
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# inner join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"inner").show()
输出:
全外连接
此连接将两个数据帧与所有匹配和不匹配的行连接起来,我们可以通过三种方式执行此连接
Syntax:
- outer: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”outer”)
- full: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”full”)
- fullouter: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”fullouter”)
示例 1:使用外部关键字
在此示例中,我们将根据两个数据帧中的 ID 列执行外连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# full outer join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"outer").show()
输出:
示例 2:使用完整关键字
在此示例中,我们将使用基于两个数据帧中 ID 列的完整关键字执行外连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# full outer join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"full").show()
输出:
示例 3:使用 fullouter 关键字
在这个例子中,我们将在两个数据帧中使用基于 ID 列的完全外连接来执行外连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# full outer join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"fullouter").show()
输出:
左连接
在这里,此连接通过返回第一个数据帧中的所有行和第二个数据帧中与第一个数据帧匹配的行来连接数据帧。我们可以使用 left 和 leftouter 执行这种类型的连接。
Syntax:
- left: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”left”)
- leftouter: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”leftouter”)
示例 1:执行左连接
在此示例中,我们将根据两个数据帧中的 ID 列使用 left 关键字执行左连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# left join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"left").show()
输出:
示例 2:执行 leftouter join
在此示例中,我们将使用 leftouter 关键字基于两个数据帧中的 ID 列执行左连接
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# left join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"leftouter").show()
输出
右连接
在这里,此连接通过返回第二个数据帧中的所有行来连接数据帧,并且仅返回第一个数据帧中与第二个数据帧匹配的行。我们可以使用 right 和 rightouter 执行这种类型的连接。
Syntax:
- right: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”right”)
- rightouter: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”rightouter”)
示例 1:执行右连接
在此示例中,我们将使用基于两个数据帧中 ID 列的 right 关键字执行右连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# right join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"right").show()
输出:
示例 2:执行右外连接
在此示例中,我们将根据两个数据帧中的 ID 列使用 rightouter 关键字执行右连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# right join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"rightouter").show()
输出:
左半连接
此连接将来自第一个数据帧的所有行,并仅返回来自第二个数据帧的匹配行
Syntax: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”leftsemi”)
示例:在此示例中,我们将根据两个数据帧中的 ID 列使用 leftsemi 关键字执行 leftsemi 连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# leftsemi join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"leftsemi").show()
输出:
左反加入
此连接仅返回第一个数据帧中第二个数据帧的不匹配记录的列
Syntax: dataframe1.join(dataframe2,dataframe1.column_name == dataframe2.column_name,”leftanti”)
示例:在此示例中,我们将根据两个数据帧中的 ID 列使用 leftanti 关键字执行 leftanti 连接。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# leftanti join on two dataframes
dataframe.join(dataframe1,
dataframe.ID == dataframe1.ID,
"leftanti").show()
输出:
SQL 表达式
我们可以使用 SQL 表达式执行上述所有类型的连接,我们必须在此表达式中提及连接的类型。为此,我们必须创建一个临时视图。
Syntax: dataframe.createOrReplaceTempView(“name”)
where
- dataframe is the input dataframe
- name is the view name
现在我们可以使用 spark.sql() 对这些视图执行连接。
Syntax: spark.sql(“select * from dataframe1, dataframe2 where dataframe1.column_name == dataframe2.column_name “)
where,
- dataframe1 is the first view dataframe
- dataframe2 is the second view dataframe
- column_name is the column to be joined
示例 1:在此示例中,我们将根据 ID 列连接两个数据框。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# create a view for dataframe named student
dataframe.createOrReplaceTempView("student")
# create a view for dataframe1 named department
dataframe1.createOrReplaceTempView("department")
#use sql expression to select ID column
spark.sql(
"select * from student, department\
where student.ID == department.ID").show()
输出:
我们还可以使用以下 SQL 表达式执行上述连接:
Syntax: spark.sql(“select * from dataframe1 JOIN_TYPE dataframe2 ON dataframe1.column_name == dataframe2.column_name “)
where, JOIN_TYPE refers to above all types of joins
示例 2:使用表达式对 ID 列执行内连接
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# list of employee data
data1 = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns = ['ID', 'salary', 'department']
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data1, columns)
# create a view for dataframe named student
dataframe.createOrReplaceTempView("student")
# create a view for dataframe1 named department
dataframe1.createOrReplaceTempView("department")
# inner join on id column using sql expression
spark.sql(
"select * from student INNER JOIN \
department on student.ID == department.ID").show()
输出: