📜  如何在Python编写 Spark UDF(用户定义函数)?

📅  最后修改于: 2022-05-13 01:55:14.180000             🧑  作者: Mango

如何在Python编写 Spark UDF(用户定义函数)?

在本文中,我们将讨论 UDF(用户定义函数)以及如何在Python Spark 中编写这些函数。 UDF,基本上代表用户定义的函数。 UDF 将允许我们直接在Python中的数据帧和 SQL 数据库中应用函数,而无需单独注册它们。它还可以帮助我们通过 UDF 将函数应用于数据框列来为数据框创建新列,因此它将扩展我们的数据框功能。它可以使用 udf() 方法创建。

udf():该方法将使用 lambda函数来循环数据,其参数将接受 lambda函数,并且 lambda 值将成为该函数的参数,我们希望将其作为 UDF。

示例 Pyspark 数据框

让我们创建一个数据框,这个数据框的主题将是学生的姓名,以及他/她在 100 分中的原始分数。

Python3
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import udf
  
spark = SparkSession.builder.appName('UDF PRACTICE').getOrCreate()
  
cms = ["Name","RawScore"]
data =  [("Jack", "79"),
        ("Mira", "80"),
        ("Carter", "90")]
                     
df = spark.createDataFrame(data=data,schema=cms)
  
df.show()


Python3
def Converter(str):
    result = ""
    a = str.split(" ")
      
    for q in a:
        if q == 'J' or 'C' or 'M':
            result += q[1:2].upper()
              
    return result


Python3
NumberUDF = udf(lambda m: Converter(m))


Python3
df.withColumn("Special Names", NumberUDF("Name")).show()


Python3
@udf(returnType=StringType())
def Converter(str):
    result = ""
    a = str.split(" ")
      
    for q in a:
        if q == 'J' or 'C' or 'M':
            result += q[1:2].upper()
        else:
            result += q
    return result
  
  
df.withColumn("Special Names", Converter("Name")) \
    .show()


Python3
def SQRT(x):
    return float(math.sqrt(x)+3)


Python3
UDF_marks = udf(lambda m: SQRT(m),FloatType())


Python3
df.select("Name","RawScore", UDF_marks("RawScore")).show()


输出:



创建示例函数

现在,我们必须创建一个函数。因此,为了便于理解,我们将创建一个简单的函数来拆分列并检查该列中的遍历对象是否等于 'J'(Capital J) 或 'C'(Capital C) 或 'M '(Capital M),所以它会转换那个单词的第二个字母,用它的大写版本。这段代码的实现是:

蟒蛇3

def Converter(str):
    result = ""
    a = str.split(" ")
      
    for q in a:
        if q == 'J' or 'C' or 'M':
            result += q[1:2].upper()
              
    return result

从 Sample函数制作 UDF

现在,我们将其转换为我们的 UDF函数,这将反过来减少我们在数据上的工作量。为此,我们在 UDF 中使用 lambda。

蟒蛇3

NumberUDF = udf(lambda m: Converter(m))

在数据帧上使用 UDF

我们将在这里使用的下一件事是 withcolumn(),请记住 withcolumn() 将返回一个完整的数据帧。所以我们将只使用我们现有的 df 数据帧,并且返回的值将只存储在 df 中(基本上我们将附加它)。

蟒蛇3

df.withColumn("Special Names", NumberUDF("Name")).show()

输出:



注意:我们也可以一步完成所有这些事情。

带注释的 UDF

现在,一个简短而聪明的方法是使用“注释”(或装饰器) 。这将以更少的步骤创建我们的 UDF函数。对于这一点,我们所有的UDF函数的前面做用@符号(装饰),并给予其参数部分函数,即分配的返回类型为Intergertype(),StringType(),等的返回类型

蟒蛇3

@udf(returnType=StringType())
def Converter(str):
    result = ""
    a = str.split(" ")
      
    for q in a:
        if q == 'J' or 'C' or 'M':
            result += q[1:2].upper()
        else:
            result += q
    return result
  
  
df.withColumn("Special Names", Converter("Name")) \
    .show()

输出:

例子:

现在,让我们假设学校有一个评分方案,它根据平方根加 3 来校准学生的分数(即他们将校准 15 分中的分数)。所以,我们将定义一个UDF函数,这次我们将指定返回类型。即浮点数据类型。所以,这个函数的声明将是——

蟒蛇3

def SQRT(x):
    return float(math.sqrt(x)+3)

现在,我们将定义一个 udf,其返回类型将始终为 float,即我们强制函数以及 UDF 仅以浮点数形式为我们提供结果。这个函数的定义是——



蟒蛇3

UDF_marks = udf(lambda m: SQRT(m),FloatType())

udf,FloatType() 的第二个参数将始终强制 UDF函数仅以浮动类型返回结果。现在,我们将在数据框中的 RawScore 列上使用我们的 udf函数UDF_marks,并将生成一个名为“RawScore”的新列,这将是该列的默认命名。这个代码看起来像 -

蟒蛇3

df.select("Name","RawScore", UDF_marks("RawScore")).show()

输出: