📌  相关文章
📜  在使用 handleInvalid = "error" 组装一行时遇到 null.考虑从数据集中删除空值或使用 handleInvalid = "keep" 或 "skip". (1)

📅  最后修改于: 2023-12-03 15:07:57.881000             🧑  作者: Mango

Spark SQL null值处理

当在使用Spark SQL处理数据时,我们经常会遇到null值。在这种情况下,我们需要考虑如何处理这些null值。Spark SQL提供了几种处理null值的方法。

删除空值

使用 na.drop() 方法可以删除含有null值的行或列。例如,假设我们有一个名为mydata的数据集,其中有一个名为age的列含有null值:

mydata = spark.createDataFrame([(1, 20), (2, None), (3, 30), (4, None)], ['id', 'age'])

mydata.show()

+---+----+
| id| age|
+---+----+
|  1|  20|
|  2|null|
|  3|  30|
|  4|null|
+---+----+

我们可以使用 na.drop() 方法删除这些null值所在的行:

mydata.na.drop().show()

+---+---+
| id|age|
+---+---+
|  1| 20|
|  3| 30|
+---+---+

该方法还可以接受一个参数,用于指定删除方式。默认情况下,它会删除所有含有null值的行,也就是 how='any'。我们也可以指定只删除全部为null值的行,即 how='all'

keep和skip

在处理null值时,还有两个参数可供选择:handleInvalid='keep'handleInvalid='skip'

handleInvalid='keep' 时,Spark SQL将会保留null值,并且将它们视为另一类。例如,如果我们使用下面的代码处理上面的数据集:

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age'], outputCols=['age'])
result = imputer.setStrategy('mean').fit(mydata).transform(mydata)

result.show()

+---+----+----+
| id| age| age|
+---+----+----+
|  1|  20|20.0|
|  2|null|25.0|
|  3|  30|30.0|
|  4|null|25.0|
+---+----+----+

我们可以看到,null值已被替换为平均值25,并且另一列age被添加到数据集中以保存null值(这就是 handleInvalid='keep' 的作用)。

handleInvalid='skip' 时,Spark SQL将会跳过含有null值的行。例如,如果我们使用下面的代码处理上面的数据集:

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age'], outputCols=['age'])
result = imputer.setStrategy('mean').setHandleInvalid('skip').fit(mydata).transform(mydata)

result.show()

+---+----+
| id| age|
+---+----+
|  1|  20|
|  3|  30|
+---+----+

我们可以看到,含有null值的行已被跳过。

error

还有一种处理null值的方法是将 handleInvalid='error',这将导致出现null值时出现错误。例如,如果我们使用下面的代码处理上面的数据集:

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age'], outputCols=['age'])
result = imputer.setStrategy('mean').setHandleInvalid('error').fit(mydata).transform(mydata)

result.show()

则会出现下面的错误:

Py4JJavaError: An error occurred while calling o92.transform.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 16, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (double) => double)
...
Caused by: java.lang.NullPointerException
...

因为我们指定了 handleInvalid='error',在出现null值时程序会抛出异常。

总结

这就是Spark SQL处理null值的方法:删除空值、keep、skip和error。通过使用这些方法,我们可以更好地处理含有null值的数据集。