📅  最后修改于: 2023-12-03 15:25:14.662000             🧑  作者: Mango
在 PySpark 中,DataFrame 是一个分布式的数据集合。它类似于关系型数据库中的表,但是分布式存储在不同的节点上,可以并行处理数据。有时候,我们需要将 DataFrame 中的某一列转换为 Python 列表进行后续的数据处理。以下是 PySpark DataFrame 列转换为 Python 列表的几种方式。
如果 DataFrame 中的数据不是太大,可以使用 collect()
函数获取所有数据,再使用 map()
函数对某一列进行转换。
from pyspark.sql.functions import col
# 创建 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ['Name', 'ID'])
# 将 ID 列转换为 Python 列表
id_list = df.select(col('ID')).rdd.map(lambda x: x[0]).collect()
print(id_list)
# 输出: [1, 2, 3]
这里使用 select()
函数选择需要转换的列,然后使用 rdd
将 DataFrame 转换为 RDD,再使用 map()
函数将每个行数据中的第一个元素(即 ID 列)提取出来,最后使用 collect()
函数将数据收集到本地。
需要注意的是,当 DataFrame 中的数据较大时,这种方式会产生数据倾斜或者内存不足等问题,因此需要慎重使用。
如果 DataFrame 中的数据不是特别大,但是使用上述方法存在性能问题,可以考虑使用 toPandas()
函数将 DataFrame 转换为 Pandas DataFrame,再使用 Pandas 的方法进行转换。
import pandas as pd
# 创建 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["Name", "ID"])
# 将 ID 列转换为 Python 列表
id_list = pd.DataFrame(df.select(col("ID")).collect(), columns=["ID"])["ID"].tolist()
print(id_list)
# 输出: [1, 2, 3]
这里使用 select()
函数选择需要转换的列,然后使用 collect()
函数将数据收集到本地,并将其转换为 Pandas DataFrame。最后,使用 Pandas 的方法将数据转换为 Python 列表。
需要注意的是,这种方式会将数据收集到 Driver 端,如果数据量过大会导致内存不足问题。
如果 DataFrame 中的数据非常大,并且无法使用上述两种方式进行转换,可以考虑使用 toLocalIterator()
函数逐行读取数据。
# 创建 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["Name", "ID"])
# 将 ID 列转换为 Python 列表
id_list = []
for row in df.select(col("ID")).toLocalIterator():
id_list.append(row[0])
print(id_list)
# 输出: [1, 2, 3]
这里使用 select()
函数选择需要转换的列,然后使用 toLocalIterator()
函数逐行读取数据,并将每行数据中的第一个元素(即 ID 列)添加到列表中。
需要注意的是,这种方式会逐行读取数据,如果数据量过大会导致性能问题,因此需要慎重使用。
以上是几种将 PySpark DataFrame 列转换为 Python 列表的方式。需要根据具体的情况选择合适的方法。