📜  PySpark Groupby(1)

📅  最后修改于: 2023-12-03 15:33:55.717000             🧑  作者: Mango

PySpark Groupby

PySpark Groupby是PySpark中对数据进行汇总处理的重要手段之一。它可以用来对数据进行分组,然后对每个分组进行汇总统计。本文将会介绍PySpark Groupby的基本用法和一些高级用法。

基本用法

我们以一个示例数据集"sales"来说明基本用法。该数据集包含了不同零售店的销售数据:

| Date | Store | Product | Sale | |------------|---------|---------|------| | 2021-01-01 | Store A | Apple | 10 | | 2021-01-01 | Store A | Pear | 5 | | 2021-01-01 | Store B | Apple | 20 | | 2021-01-01 | Store B | Pear | 15 | | 2021-01-02 | Store A | Apple | 8 | | 2021-01-02 | Store A | Pear | 6 | | 2021-01-02 | Store B | Apple | 18 | | 2021-01-02 | Store B | Pear | 13 |

我们要对这个数据集进行分组统计,以日期和店铺为分组,统计每个分组中销售额总和。首先,我们需要将数据读入PySpark中:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("GroupByExample").getOrCreate()

sales = spark.createDataFrame([
    ('2021-01-01', 'Store A', 'Apple', 10),
    ('2021-01-01', 'Store A', 'Pear', 5),
    ('2021-01-01', 'Store B', 'Apple', 20),
    ('2021-01-01', 'Store B', 'Pear', 15),
    ('2021-01-02', 'Store A', 'Apple', 8),
    ('2021-01-02', 'Store A', 'Pear', 6),
    ('2021-01-02', 'Store B', 'Apple', 18),
    ('2021-01-02', 'Store B', 'Pear', 13)],
    ['Date', 'Store', 'Product', 'Sale'])

接着,我们可以使用PySpark Groupby对数据进行分组统计:

from pyspark.sql.functions import sum

sales.groupBy('Date', 'Store').agg(sum('Sale').alias('TotalSales')).show()

输出结果如下:

+----------+-------+----------+
|      Date|  Store|TotalSales|
+----------+-------+----------+
|2021-01-02|Store A|        14|
|2021-01-02|Store B|        31|
|2021-01-01|Store B|        35|
|2021-01-01|Store A|        15|
+----------+-------+----------+

我们可以看到,数据已经按照日期和店铺进行了分组,每组的销售额总和也已经被统计出来了。

高级用法

除了基本用法外,PySpark Groupby还有一些高级用法,可以更灵活地对数据进行处理。下面我们将介绍一些高级用法。

指定不同的汇总方法

PySpark Groupby默认使用sum函数对分组后的数据进行汇总统计,但我们也可以指定不同的汇总方法。下面我们进行一个例子,计算每个分组的最大值、最小值和平均值:

from pyspark.sql.functions import min, max, avg

sales.groupBy('Date', 'Store').agg(
    min('Sale').alias('MinSale'),
    max('Sale').alias('MaxSale'),
    avg('Sale').alias('AvgSale')
).show()

输出结果如下:

+----------+-------+-------+-------+------+
|      Date|  Store|MinSale|MaxSale|AvgSale|
+----------+-------+-------+-------+------+
|2021-01-02|Store A|      6|      8|   7.0|
|2021-01-02|Store B|     13|     18|  15.5|
|2021-01-01|Store B|     15|     20|  17.5|
|2021-01-01|Store A|      5|     10|   7.5|
+----------+-------+-------+-------+------+

我们可以看到,在每个分组中,最大值、最小值和平均值都已经被计算出来了。

自定义汇总函数

如果默认提供的汇总方法无法满足我们的需求,我们也可以自定义汇总函数来对数据进行处理。下面我们进行一个例子,计算不同店铺销售额的增长率:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def growth_rate(sales):
    first_day_sales = sales[0]
    last_day_sales = sales[-1]
    return (last_day_sales - first_day_sales) / first_day_sales

growth_rate_udf = udf(growth_rate, DoubleType())

sales.groupBy('Store').agg(
    growth_rate_udf(collect_list('Sale')).alias('GrowthRate')
).show()

输出结果如下:

+-------+------------------+
|  Store|        GrowthRate|
+-------+------------------+
|Store A|0.1090909090909091|
|Store B|              0.55|
+-------+------------------+

我们可以看到,每个店铺的销售额增长率都已经被计算出来了。

分组列类型转换

在分组前,我们可以先对需要分组的列进行类型转换。比如,我们可以将日期列的字符串类型转换为日期类型,以便更好地对数据进行处理。下面我们进行一个例子,将日期列从字符串类型转换为日期类型,然后按每个星期进行分组:

from pyspark.sql.functions import date_format, weekofyear

sales = sales.withColumn('Date', date_format('Date', 'yyyy-MM-dd').cast('date'))

sales.groupBy(weekofyear('Date')).agg(sum('Sale')).show()

输出结果如下:

+----------------+----------+
|weekofyear(Date)|sum(Sale) |
+----------------+----------+
|               1|        81|
+----------------+----------+

我们可以看到数据已经按每个星期进行了分组。注意,在此之前,我们将日期列从字符串类型转换为日期类型,并使用weekofyear函数获取了星期信息。这样我们就能够更好地对数据进行处理了。

总结

本文介绍了PySpark Groupby的基本用法和一些高级用法。使用PySpark Groupby可以方便地对数据进行分组统计,是数据分析领域中不可或缺的工具之一。如果您需要对大量数据进行分组统计处理,建议尝试使用PySpark Groupby。