📜  PySpark partitionBy() 方法

📅  最后修改于: 2022-05-13 01:55:51.746000             🧑  作者: Mango

PySpark partitionBy() 方法

PySpark partitionBy() 用于在将 DataFrame 写入磁盘/文件系统时根据列值进行分区。当您通过调用 partitionBy() 将 DataFrame 写入磁盘时,Pyspark 会根据分区列拆分记录并将每个分区数据存储到子目录中。

PySpark 分区是一种基于一个或多个分区键将大型数据集拆分为较小数据集的方法。您还可以使用 partitionBy() 在多个列上创建分区,只需将要分区的列作为参数传递给此方法。

Syntax: partitionBy(self, *cols)

让我们通过读取 CSV 文件来创建一个 DataFrame。您可以在此链接中找到数据集 Cricket_data_set_odi.csv

创建用于演示的数据框:



Python3
# importing module
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# create DataFrame
df=spark.read.option(
  "header",True).csv("Cricket_data_set_odi.csv")
  
# Display schema
df.printSchema()


Python3
df.write.option("header", True) \
        .partitionBy("Team") \
        .mode("overwrite") \
        .csv("Team")
  
# change directory
cd Team
  
# On our DataFrame, we have a total
# of 9 different teams hence,
# it creates 9 directories as shown below.
# The name of the sub-directory would be
# the partition column and its value 
# (partition column=value).
ls


Python3
# From above DataFrame, we will be using 
# Team and Speciality as a partition key 
# for our examples below.
# partitionBy()
df.write.option("header", True) \
        .partitionBy("Team", "Speciality") \
        .mode("overwrite") \
        .csv("Team-Speciality")
  
# change directory
cd Team = Ind
cd Team-Speciality
cd Team = Ind
ls


Python3
# partitionBy() control number of partitions
df.write.option("header",True) \
        .option("maxRecordsPerFile", 2) \
        .partitionBy("Team") \
        .mode("overwrite") \
        .csv("Team")
# change directory
cd Team
ls


输出:

PySpark partitionBy() 与一列:

从上面的 DataFrame 中,我们将使用 Team 作为以下示例的分区键:

蟒蛇3

df.write.option("header", True) \
        .partitionBy("Team") \
        .mode("overwrite") \
        .csv("Team")
  
# change directory
cd Team
  
# On our DataFrame, we have a total
# of 9 different teams hence,
# it creates 9 directories as shown below.
# The name of the sub-directory would be
# the partition column and its value 
# (partition column=value).
ls

输出:

PySpark partitionBy() 多列:

您还可以使用 PySpark partitionBy() 在多列上创建分区。只需将要分区的列作为参数传递给此方法。



在上面的 DataFrame 中,我们使用 Team 和 Specialty 作为下面示例的分区键。

蟒蛇3

# From above DataFrame, we will be using 
# Team and Speciality as a partition key 
# for our examples below.
# partitionBy()
df.write.option("header", True) \
        .partitionBy("Team", "Speciality") \
        .mode("overwrite") \
        .csv("Team-Speciality")
  
# change directory
cd Team = Ind
cd Team-Speciality
cd Team = Ind
ls

输出:

控制每个分区文件的记录数:

如果要控制每个分区的记录数,请使用选项 maxRecordsPerFile。当您的数据倾斜时(某些分区的记录很少,而其他分区的记录数量很多),这尤其有用。

蟒蛇3

# partitionBy() control number of partitions
df.write.option("header",True) \
        .option("maxRecordsPerFile", 2) \
        .partitionBy("Team") \
        .mode("overwrite") \
        .csv("Team")
# change directory
cd Team
ls

输出: