📅  最后修改于: 2023-12-03 15:07:57.881000             🧑  作者: Mango
当在使用Spark SQL处理数据时,我们经常会遇到null值。在这种情况下,我们需要考虑如何处理这些null值。Spark SQL提供了几种处理null值的方法。
使用 na.drop()
方法可以删除含有null值的行或列。例如,假设我们有一个名为mydata
的数据集,其中有一个名为age
的列含有null值:
mydata = spark.createDataFrame([(1, 20), (2, None), (3, 30), (4, None)], ['id', 'age'])
mydata.show()
+---+----+
| id| age|
+---+----+
| 1| 20|
| 2|null|
| 3| 30|
| 4|null|
+---+----+
我们可以使用 na.drop()
方法删除这些null值所在的行:
mydata.na.drop().show()
+---+---+
| id|age|
+---+---+
| 1| 20|
| 3| 30|
+---+---+
该方法还可以接受一个参数,用于指定删除方式。默认情况下,它会删除所有含有null值的行,也就是 how='any'
。我们也可以指定只删除全部为null值的行,即 how='all'
。
在处理null值时,还有两个参数可供选择:handleInvalid='keep'
和handleInvalid='skip'
。
当 handleInvalid='keep'
时,Spark SQL将会保留null值,并且将它们视为另一类。例如,如果我们使用下面的代码处理上面的数据集:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['age'], outputCols=['age'])
result = imputer.setStrategy('mean').fit(mydata).transform(mydata)
result.show()
+---+----+----+
| id| age| age|
+---+----+----+
| 1| 20|20.0|
| 2|null|25.0|
| 3| 30|30.0|
| 4|null|25.0|
+---+----+----+
我们可以看到,null值已被替换为平均值25,并且另一列age
被添加到数据集中以保存null值(这就是 handleInvalid='keep'
的作用)。
当 handleInvalid='skip'
时,Spark SQL将会跳过含有null值的行。例如,如果我们使用下面的代码处理上面的数据集:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['age'], outputCols=['age'])
result = imputer.setStrategy('mean').setHandleInvalid('skip').fit(mydata).transform(mydata)
result.show()
+---+----+
| id| age|
+---+----+
| 1| 20|
| 3| 30|
+---+----+
我们可以看到,含有null值的行已被跳过。
还有一种处理null值的方法是将 handleInvalid='error'
,这将导致出现null值时出现错误。例如,如果我们使用下面的代码处理上面的数据集:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['age'], outputCols=['age'])
result = imputer.setStrategy('mean').setHandleInvalid('error').fit(mydata).transform(mydata)
result.show()
则会出现下面的错误:
Py4JJavaError: An error occurred while calling o92.transform.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 16, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (double) => double)
...
Caused by: java.lang.NullPointerException
...
因为我们指定了 handleInvalid='error'
,在出现null值时程序会抛出异常。
这就是Spark SQL处理null值的方法:删除空值、keep、skip和error。通过使用这些方法,我们可以更好地处理含有null值的数据集。