📌  相关文章
📜  如何向 PySpark DataFrame 添加新列?

📅  最后修改于: 2022-05-13 01:54:54.562000             🧑  作者: Mango

如何向 PySpark DataFrame 添加新列?

在本文中,我们将讨论如何向 PySpark Dataframe 添加新列。

创建第一个数据框进行演示:

在这里,我们将创建示例数据框,我们将进一步使用它来演示方法的目的。

Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
dataframe.show()


Python3
# importing module
import pyspark
 
# import lit function
from pyspark.sql.functions import lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# Add a column named salary with value as 34000
dataframe.withColumn("salary", lit(34000)).show()


Python3
# importing module
import pyspark
 
# import lit function
from pyspark.sql.functions import lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# Add a column named salary from ID column with multiply of 2300
dataframe.withColumn("salary", dataframe.ID*2300).show()


Python3
# importing module
import pyspark
 
# import concat_ws function
from pyspark.sql.functions import concat_ws
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# Add a column named Details from Name and Company columns separated by -
dataframe.withColumn("Details", concat_ws("-", "NAME", 'Company')).show()


Python3
# importing module
import pyspark
 
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# add salary column by checking its existence
if 'salary' not in dataframe.columns:
    dataframe.withColumn("salary", lit(34000)).show()


Python3
# importing module
import pyspark
 
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# add salary column with constant value - 34000
dataframe.select(lit(34000).alias("salary")).show()


Python3
# importing module
import pyspark
 
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# create view
dataframe.createOrReplaceTempView("view")
 
# add new column named salary with 34000 value
spark.sql("select '34000' as salary from view").show()


Python3
# importing module
import pyspark
 
# import when and lit function
from pyspark.sql.functions import when, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# add a new column named salary
# add value 34000 when name is sravan
# add value 31000 when name is ojsawi or bobby
# otherwise add 78000
dataframe.withColumn("salary",
                     when((dataframe.NAME == "sravan"), lit("34000")).
                     when((dataframe.NAME == "ojsawi") | (
                         dataframe.NAME == "bobby"), lit("31000"))
                     .otherwise(lit("78000"))).show()


输出:

方法 1:添加具有恒定值的新列

在这种添加具有常量值的新列的方法中,用户需要调用 withColumn()函数的 lit()函数参数并将所需的参数传递给这些函数。在这里, lit() 在 pyspark.sql 中可用。功能模块。

语法

dataframe.withColumn("column_name", lit(value))

在哪里,

  • 数据框是 pyspark 输入数据框
  • column_name 是要添加的新列
  • value 是要分配给该列的常量值

例子:

在此示例中,我们使用 withColumn()函数和 lit()函数作为Python编程语言中的参数,将值 34000 的列添加到上述数据帧中。

Python3

# importing module
import pyspark
 
# import lit function
from pyspark.sql.functions import lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# Add a column named salary with value as 34000
dataframe.withColumn("salary", lit(34000)).show()

输出:

方法二:基于DataFrame的另一列添加列

在这种方法下,用户可以基于给定数据框中的现有列添加新列。

示例 1:使用 withColumn() 方法

在这里,在这个例子中,用户需要使用 withColumn()函数指定现有的列,并在Python编程语言中传递所需的参数。

语法

dataframe.withColumn("column_name", dataframe.existing_column)

在哪里,

  • 数据框是输入数据框
  • column_name 是新列
  • existing_column 是存在的列

在此示例中,我们使用Python语言中的 withColumn() 方法从 ID 列中添加一个名为薪水的列,乘以 2300,

Python3

# importing module
import pyspark
 
# import lit function
from pyspark.sql.functions import lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# Add a column named salary from ID column with multiply of 2300
dataframe.withColumn("salary", dataframe.ID*2300).show()

输出:

示例2:使用 concat_ws()

在此示例中,用户必须通过从 pyspark.sql.functions 模块导入此方法来连接两个现有列并将它们作为新列。

语法

在哪里,

  • 数据框是输入数据框
  • column_name 是新的列名
  • existing_column1 和 existing_column2 是要与 Separator 一起添加的两列,以便为新列生成值
  • 分隔符就像具有两列的值之间的运算符

例子:

在此示例中,我们在Python语言中从 Name 和 Company 列中添加一个名为 Details 的列,以“-”分隔。

Python3

# importing module
import pyspark
 
# import concat_ws function
from pyspark.sql.functions import concat_ws
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# Add a column named Details from Name and Company columns separated by -
dataframe.withColumn("Details", concat_ws("-", "NAME", 'Company')).show()

输出:

方法3:DataFrame上不存在时添加列

在此方法中,用户可以通过使用 lit()函数添加列并使用 if 条件检查来添加不存在的列。

句法:

if 'column_name' not in dataframe.columns:
   dataframe.withColumn("column_name",lit(value))

在哪里,

  • 数据框。 columns 用于获取列名

例子:

在这个 例如,我们使用 if 条件和 withColumn() 和 lit()函数将一列工资添加到 34000。

Python3

# importing module
import pyspark
 
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# add salary column by checking its existence
if 'salary' not in dataframe.columns:
    dataframe.withColumn("salary", lit(34000)).show()

输出:

方法 4:使用 select() 将 Column 添加到 DataFrame

在此方法中,要向数据框添加列,用户需要调用 select()函数来添加带有 lit()函数和 select() 方法的列。它还将显示选定的列。

语法

dataframe.select(lit(value).alias("column_name"))

在哪里,

  • 数据框是输入数据框
  • column_name 是新列

例子:

在此示例中,我们使用 select()函数和 lit()函数作为其参数添加了一个常量值为 34000 的薪水列。

Python3

# importing module
import pyspark
 
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# add salary column with constant value - 34000
dataframe.select(lit(34000).alias("salary")).show()

输出:

方法 5:使用 SQL 表达式向 DataFrame 添加列

在这种方法中,用户必须使用带有 SQL函数的 SQL 表达式来添加列。在此之前,我们必须创建一个临时视图,从该视图中,我们必须添加和选择列。

语法

dataframe.createOrReplaceTempView("name")
spark.sql("select 'value' as column_name from view")

在哪里,

  • 数据框是输入数据框
  • name 是临时视图名称
  • sql函数将 SQL 表达式作为输入来添加一列
  • column_name 是新的列名
  • value 是列值

例子:

添加名为salary 的新列,值为34000

Python3

# importing module
import pyspark
 
# import concat_ws and lit function
from pyspark.sql.functions import concat_ws, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# create view
dataframe.createOrReplaceTempView("view")
 
# add new column named salary with 34000 value
spark.sql("select '34000' as salary from view").show()

输出:

方法六:根据条件添加列值

在这种方法下,用户需要使用 when函数和 withcolumn() 方法来检查条件并根据现有的列值添加列值。因此,我们必须从 pyspark.sql.functions 导入 when() 以根据给定条件添加特定列。

语法

在哪里,

  • column_name 是新的列名
  • condition1 是使用 lit() 通过 when 检查和分配 value1 的条件
  • 否则,它是用于检查何时没有条件满足的关键字。

例子:

在这个例子中,我们添加一个名为salary 的新列,当名称为sravan 时添加值34000,当名称为ojsawi 时添加值31000,否则bobby 使用when() 和withColumn()函数添加78000。

Python3

# importing module
import pyspark
 
# import when and lit function
from pyspark.sql.functions import when, lit
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# add a new column named salary
# add value 34000 when name is sravan
# add value 31000 when name is ojsawi or bobby
# otherwise add 78000
dataframe.withColumn("salary",
                     when((dataframe.NAME == "sravan"), lit("34000")).
                     when((dataframe.NAME == "ojsawi") | (
                         dataframe.NAME == "bobby"), lit("31000"))
                     .otherwise(lit("78000"))).show()

输出: