如何在 PySpark 数据框中添加列总和作为新列?
在本文中,我们将了解如何通过各种方法在 Pyspark 数据框中添加新列。这意味着我们要创建一个新列,该列将包含给定行中所有值的总和。现在让我们讨论如何将 sum 添加为新列的各种方法
但首先,让我们为演示创建 Dataframe
Python3
# import SparkSession from the pyspark
from pyspark.sql import SparkSession
# build and create the SparkSession
# with name "sum as new_col"
spark = SparkSession.builder.appName("sum as new_col").getOrCreate()
# Creating the Spark DataFrame
data = spark.createDataFrame([('x', 5, 3, 7),
('Y', 3, 3, 6),
('Z', 5, 2, 6)],
['A', 'B', 'C', 'D'])
# Print the schema of the DataFrame by
# printSchema()
data.printSchema()
# Showing the DataFrame
data.show()
Python3
# import the functions as F from pyspark.sql
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
# define the sum_col
def sum_col(b, c, d):
col_sum = b+c+d
return col_sum
# integer datatype is defined
new_f = F.udf(sum_col, IntegerType())
# calling and creating the new
# col as udf_method_sum
df_col1 = data.withColumn("Udf_method_sum",
new_f("B", "C", "D"))
# Showing and printing the schema of the Dataframe
df_col1.printSchema()
df_col1.show()
Python3
# import expr from the functions
from pyspark.sql.functions import expr
# create the new column as by withcolumn
# by giving argument as
# col_name ='expression_method_sum'
# and expr() function which
# will take expressions argument as string
df_col1 = df_col1.withColumn('expression_method_sum',
expr("B+C + D"))
# Showing and printing the schema of
# the Dataframe
df_col1.printSchema()
df_col1.show()
Python3
# Creating the temporary view
# of the DataFrame as temp.
df_col1 = df_col1.createTempView("temp")
# By using sql clause creating
# new columns as sql_method
df_col1=spark.sql('select *, B+C+D as sql_method from temp')
# Printing the schema of the dataFrame
# and showing the DataFrame
df_col1.printScheam()
df_col1.show()
Python3
# select everything from table df_col1 and
# create new sum column as " select_method_sum".
df_col1 = df_col1.select('*',
(df_col1["B"]+df_col1["C"]+df_col1['D']).
alias("select_method_sum"))
# Showing the schema and table
df_col1.printSchema()
df_col1.show()
Python3
# by using withcolumn function
df_col1 = df_col1.withColumn('withcolum_Sum',
data['B']+data['C']+data['D'])
# Showing and printing the schema
# of the Dataframe
df_col1.printSchema()
df_col1.show()
输出:
现在我们将看到有关如何在 spark Dataframe 中添加新列的不同方法。
方法一:使用UDF
在这个方法中,我们将定义一个函数,它将列名作为参数并返回行的总和。通过使用 UDF(User-defined Functions) 方法,该方法用于在 spark 中创建可重用的函数。这个函数允许我们根据我们的要求创建新函数,这就是为什么这也被称为使用的定义函数。
现在我们定义 udf函数的数据类型并创建将返回值的函数,该值是行中所有值的总和。
蟒蛇3
# import the functions as F from pyspark.sql
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
# define the sum_col
def sum_col(b, c, d):
col_sum = b+c+d
return col_sum
# integer datatype is defined
new_f = F.udf(sum_col, IntegerType())
# calling and creating the new
# col as udf_method_sum
df_col1 = data.withColumn("Udf_method_sum",
new_f("B", "C", "D"))
# Showing and printing the schema of the Dataframe
df_col1.printSchema()
df_col1.show()
输出:
方法二:使用 expr()函数。
通过使用 expr(str) 将表达式参数作为字符串的函数。 pyspark 中有另一个函数,它将数学表达式作为字符串形式的参数。例如,如果您想要行的总和,则将参数作为“n1+n2+n3+n4……”传递。其中 n1,n2,n3... 是列名
蟒蛇3
# import expr from the functions
from pyspark.sql.functions import expr
# create the new column as by withcolumn
# by giving argument as
# col_name ='expression_method_sum'
# and expr() function which
# will take expressions argument as string
df_col1 = df_col1.withColumn('expression_method_sum',
expr("B+C + D"))
# Showing and printing the schema of
# the Dataframe
df_col1.printSchema()
df_col1.show()
输出:
方法三:使用SQL操作
在这个方法中,我们首先要创建表的临时视图,借助 createTempView 我们可以创建临时视图。这个 temp 的寿命取决于 sparkSession 的寿命
然后在创建表后通过 SQL 子句选择表,它将所有值作为字符串
蟒蛇3
# Creating the temporary view
# of the DataFrame as temp.
df_col1 = df_col1.createTempView("temp")
# By using sql clause creating
# new columns as sql_method
df_col1=spark.sql('select *, B+C+D as sql_method from temp')
# Printing the schema of the dataFrame
# and showing the DataFrame
df_col1.printScheam()
df_col1.show()
输出:
方法 4:使用 select()
使用 select() 方法选择表并传递参数第一个是列名,或“*”用于选择整个表,第二个参数传递列名进行添加,别名()函数用于给出新创建的列的名称。
蟒蛇3
# select everything from table df_col1 and
# create new sum column as " select_method_sum".
df_col1 = df_col1.select('*',
(df_col1["B"]+df_col1["C"]+df_col1['D']).
alias("select_method_sum"))
# Showing the schema and table
df_col1.printSchema()
df_col1.show()
输出:
方法 5:使用 withcolumn()
WithColumn() 是数据框的转换函数,用于更改值、更改数据类型以及从现有列创建新列。
此函数将参数作为求和的新列名和列名。
蟒蛇3
# by using withcolumn function
df_col1 = df_col1.withColumn('withcolum_Sum',
data['B']+data['C']+data['D'])
# Showing and printing the schema
# of the Dataframe
df_col1.printSchema()
df_col1.show()
输出: