如何向 PySpark DataFrame 添加新列?
在本文中,我们将讨论如何向 PySpark Dataframe 添加新列。
创建第一个数据框进行演示:
在这里,我们将创建示例数据框,我们将进一步使用它来演示方法的目的。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
dataframe.show()
Python3
# importing module
import pyspark
# import lit function
from pyspark.sql.functions import lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# Add a column named salary with value as 34000
dataframe.withColumn("salary", lit(34000)).show()
Python3
# importing module
import pyspark
# import lit function
from pyspark.sql.functions import lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# Add a column named salary from ID column with multiply of 2300
dataframe.withColumn("salary", dataframe.ID*2300).show()
Python3
# importing module
import pyspark
# import concat_ws function
from pyspark.sql.functions import concat_ws
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# Add a column named Details from Name and Company columns separated by -
dataframe.withColumn("Details", concat_ws("-", "NAME", 'Company')).show()
Python3
# importing module
import pyspark
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# add salary column by checking its existence
if 'salary' not in dataframe.columns:
dataframe.withColumn("salary", lit(34000)).show()
Python3
# importing module
import pyspark
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# add salary column with constant value - 34000
dataframe.select(lit(34000).alias("salary")).show()
Python3
# importing module
import pyspark
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# create view
dataframe.createOrReplaceTempView("view")
# add new column named salary with 34000 value
spark.sql("select '34000' as salary from view").show()
Python3
# importing module
import pyspark
# import when and lit function
from pyspark.sql.functions import when, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# add a new column named salary
# add value 34000 when name is sravan
# add value 31000 when name is ojsawi or bobby
# otherwise add 78000
dataframe.withColumn("salary",
when((dataframe.NAME == "sravan"), lit("34000")).
when((dataframe.NAME == "ojsawi") | (
dataframe.NAME == "bobby"), lit("31000"))
.otherwise(lit("78000"))).show()
输出:
方法 1:添加具有恒定值的新列
在这种添加具有常量值的新列的方法中,用户需要调用 withColumn()函数的 lit()函数参数并将所需的参数传递给这些函数。在这里, lit() 在 pyspark.sql 中可用。功能模块。
语法:
dataframe.withColumn("column_name", lit(value))
在哪里,
- 数据框是 pyspark 输入数据框
- column_name 是要添加的新列
- value 是要分配给该列的常量值
例子:
在此示例中,我们使用 withColumn()函数和 lit()函数作为Python编程语言中的参数,将值 34000 的列添加到上述数据帧中。
Python3
# importing module
import pyspark
# import lit function
from pyspark.sql.functions import lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# Add a column named salary with value as 34000
dataframe.withColumn("salary", lit(34000)).show()
输出:
方法二:基于DataFrame的另一列添加列
在这种方法下,用户可以基于给定数据框中的现有列添加新列。
示例 1:使用 withColumn() 方法
在这里,在这个例子中,用户需要使用 withColumn()函数指定现有的列,并在Python编程语言中传递所需的参数。
语法:
dataframe.withColumn("column_name", dataframe.existing_column)
在哪里,
- 数据框是输入数据框
- column_name 是新列
- existing_column 是存在的列
在此示例中,我们使用Python语言中的 withColumn() 方法从 ID 列中添加一个名为薪水的列,乘以 2300,
Python3
# importing module
import pyspark
# import lit function
from pyspark.sql.functions import lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# Add a column named salary from ID column with multiply of 2300
dataframe.withColumn("salary", dataframe.ID*2300).show()
输出:
示例2:使用 concat_ws()
在此示例中,用户必须通过从 pyspark.sql.functions 模块导入此方法来连接两个现有列并将它们作为新列。
语法:
dataframe.withColumn(“column_name”, concat_ws(“Separator”,”existing_column1″,’existing_column2′))
在哪里,
- 数据框是输入数据框
- column_name 是新的列名
- existing_column1 和 existing_column2 是要与 Separator 一起添加的两列,以便为新列生成值
- 分隔符就像具有两列的值之间的运算符
例子:
在此示例中,我们在Python语言中从 Name 和 Company 列中添加一个名为 Details 的列,以“-”分隔。
Python3
# importing module
import pyspark
# import concat_ws function
from pyspark.sql.functions import concat_ws
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# Add a column named Details from Name and Company columns separated by -
dataframe.withColumn("Details", concat_ws("-", "NAME", 'Company')).show()
输出:
方法3:DataFrame上不存在时添加列
在此方法中,用户可以通过使用 lit()函数添加列并使用 if 条件检查来添加不存在的列。
句法:
if 'column_name' not in dataframe.columns:
dataframe.withColumn("column_name",lit(value))
在哪里,
- 数据框。 columns 用于获取列名
例子:
在这个 例如,我们使用 if 条件和 withColumn() 和 lit()函数将一列工资添加到 34000。
Python3
# importing module
import pyspark
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# add salary column by checking its existence
if 'salary' not in dataframe.columns:
dataframe.withColumn("salary", lit(34000)).show()
输出:
方法 4:使用 select() 将 Column 添加到 DataFrame
在此方法中,要向数据框添加列,用户需要调用 select()函数来添加带有 lit()函数和 select() 方法的列。它还将显示选定的列。
语法:
dataframe.select(lit(value).alias("column_name"))
在哪里,
- 数据框是输入数据框
- column_name 是新列
例子:
在此示例中,我们使用 select()函数和 lit()函数作为其参数添加了一个常量值为 34000 的薪水列。
Python3
# importing module
import pyspark
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# add salary column with constant value - 34000
dataframe.select(lit(34000).alias("salary")).show()
输出:
方法 5:使用 SQL 表达式向 DataFrame 添加列
在这种方法中,用户必须使用带有 SQL函数的 SQL 表达式来添加列。在此之前,我们必须创建一个临时视图,从该视图中,我们必须添加和选择列。
语法:
dataframe.createOrReplaceTempView("name")
spark.sql("select 'value' as column_name from view")
在哪里,
- 数据框是输入数据框
- name 是临时视图名称
- sql函数将 SQL 表达式作为输入来添加一列
- column_name 是新的列名
- value 是列值
例子:
添加名为salary 的新列,值为34000
Python3
# importing module
import pyspark
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# create view
dataframe.createOrReplaceTempView("view")
# add new column named salary with 34000 value
spark.sql("select '34000' as salary from view").show()
输出:
方法六:根据条件添加列值
在这种方法下,用户需要使用 when函数和 withcolumn() 方法来检查条件并根据现有的列值添加列值。因此,我们必须从 pyspark.sql.functions 导入 when() 以根据给定条件添加特定列。
语法:
dataframe.withColumn(“column_name”,
when((dataframe.column_name condition1), lit(“value1”)).
when((dataframe.column_name condition2), lit(“value2”)).
———————
———————
when((dataframe.column_name conditionn), lit(“value3”)).
.otherwise(lit(“value”)) )
在哪里,
- column_name 是新的列名
- condition1 是使用 lit() 通过 when 检查和分配 value1 的条件
- 否则,它是用于检查何时没有条件满足的关键字。
例子:
在这个例子中,我们添加一个名为salary 的新列,当名称为sravan 时添加值34000,当名称为ojsawi 时添加值31000,否则bobby 使用when() 和withColumn()函数添加78000。
Python3
# importing module
import pyspark
# import when and lit function
from pyspark.sql.functions import when, lit
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# add a new column named salary
# add value 34000 when name is sravan
# add value 31000 when name is ojsawi or bobby
# otherwise add 78000
dataframe.withColumn("salary",
when((dataframe.NAME == "sravan"), lit("34000")).
when((dataframe.NAME == "ojsawi") | (
dataframe.NAME == "bobby"), lit("31000"))
.otherwise(lit("78000"))).show()
输出: