📜  如何迭代 pyspark 数据框 - Python (1)

📅  最后修改于: 2023-12-03 15:09:16.811000             🧑  作者: Mango

如何迭代 pyspark 数据框 - Python

在 PySpark 中,我们经常需要迭代数据框(DataFrame)中的每一行数据。本文将介绍如何在 PySpark 中迭代数据框。

迭代数据框

PySpark 中的数据框是由多行数据组成的二维分布式数据集。要迭代数据框,我们可以使用 foreach 函数,该函数将 Python 函数应用于数据集中的每个元素。

以下是一个简单的示例:

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# 创建数据框
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Age"])

# 定义函数
def print_row(row):
    print(row)

# 遍历数据框
df.foreach(print_row)

在上面的示例中,我们创建了一个包含 NameAge 列的数据框。然后我们定义了一个 print_row 函数,该函数接收一个数据框的行作为参数并打印该行。最后,我们使用 foreach 函数遍历数据框并调用 print_row 函数。

运行上面的代码可以看到类似以下的输出结果:

Row(Name='Alice', Age=1)
Row(Name='Bob', Age=2)
Row(Name='Cathy', Age=3)
使用 map 函数

除了使用 foreach 函数遍历数据框,我们还可以使用 map 函数将数据框中的每一行映射到一个新的数据结构或值。map 函数返回一个新的 RDD,该 RDD 包含与当前 RDD 相同数量的元素,但每个元素都经过了指定的转换。

以下是一个示例:

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# 创建数据框
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Age"])

# 定义函数
def get_name(row):
    return row[0]

# 使用 map 函数
names = df.rdd.map(get_name)

# 打印结果
for name in names.collect():
    print(name)

在上面的示例中,我们创建了一个包含 NameAge 列的数据框。然后我们定义了一个 get_name 函数,该函数接收一个行作为参数并返回该行的 Name 列。接下来,我们使用 map 函数返回一个包含数据框中所有 Name 列值的新 RDD。最后,我们使用 collect 函数将 RDD 转换为列表并遍历打印列表。

运行上面的代码可以看到类似以下的输出结果:

Alice
Bob
Cathy
使用 withColumn 函数

使用 withColumn 函数可以向数据框添加新的列,然后我们就可以迭代这些新的列。

以下是一个示例:

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# 创建数据框
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Age"])

# 添加新列
df2 = df.withColumn("UpperName", df["Name"].upper())

# 遍历新列
for row in df2.collect():
    print(row.UpperName)

在上面的示例中,我们创建了一个包含 NameAge 列的数据框。然后我们使用 withColumn 函数添加一个名为 UpperName 的新列,该列值为 Name 列的大写形式。最后,我们使用 collect 函数将数据框转换为列表并遍历新列并打印结果。

运行上面的代码可以看到类似以下的输出结果:

ALICE
BOB
CATHY
总结

以上就是如何在 PySpark 中迭代数据框的方法。我们可以使用 foreach 函数遍历数据框,使用 map 函数将数据框的每一行映射到一个新的数据结构或值,或者使用 withColumn 函数添加新的列来迭代数据框。