📅  最后修改于: 2023-12-03 15:33:55.717000             🧑  作者: Mango
PySpark Groupby是PySpark中对数据进行汇总处理的重要手段之一。它可以用来对数据进行分组,然后对每个分组进行汇总统计。本文将会介绍PySpark Groupby的基本用法和一些高级用法。
我们以一个示例数据集"sales"来说明基本用法。该数据集包含了不同零售店的销售数据:
| Date | Store | Product | Sale | |------------|---------|---------|------| | 2021-01-01 | Store A | Apple | 10 | | 2021-01-01 | Store A | Pear | 5 | | 2021-01-01 | Store B | Apple | 20 | | 2021-01-01 | Store B | Pear | 15 | | 2021-01-02 | Store A | Apple | 8 | | 2021-01-02 | Store A | Pear | 6 | | 2021-01-02 | Store B | Apple | 18 | | 2021-01-02 | Store B | Pear | 13 |
我们要对这个数据集进行分组统计,以日期和店铺为分组,统计每个分组中销售额总和。首先,我们需要将数据读入PySpark中:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("GroupByExample").getOrCreate()
sales = spark.createDataFrame([
('2021-01-01', 'Store A', 'Apple', 10),
('2021-01-01', 'Store A', 'Pear', 5),
('2021-01-01', 'Store B', 'Apple', 20),
('2021-01-01', 'Store B', 'Pear', 15),
('2021-01-02', 'Store A', 'Apple', 8),
('2021-01-02', 'Store A', 'Pear', 6),
('2021-01-02', 'Store B', 'Apple', 18),
('2021-01-02', 'Store B', 'Pear', 13)],
['Date', 'Store', 'Product', 'Sale'])
接着,我们可以使用PySpark Groupby对数据进行分组统计:
from pyspark.sql.functions import sum
sales.groupBy('Date', 'Store').agg(sum('Sale').alias('TotalSales')).show()
输出结果如下:
+----------+-------+----------+
| Date| Store|TotalSales|
+----------+-------+----------+
|2021-01-02|Store A| 14|
|2021-01-02|Store B| 31|
|2021-01-01|Store B| 35|
|2021-01-01|Store A| 15|
+----------+-------+----------+
我们可以看到,数据已经按照日期和店铺进行了分组,每组的销售额总和也已经被统计出来了。
除了基本用法外,PySpark Groupby还有一些高级用法,可以更灵活地对数据进行处理。下面我们将介绍一些高级用法。
PySpark Groupby默认使用sum函数对分组后的数据进行汇总统计,但我们也可以指定不同的汇总方法。下面我们进行一个例子,计算每个分组的最大值、最小值和平均值:
from pyspark.sql.functions import min, max, avg
sales.groupBy('Date', 'Store').agg(
min('Sale').alias('MinSale'),
max('Sale').alias('MaxSale'),
avg('Sale').alias('AvgSale')
).show()
输出结果如下:
+----------+-------+-------+-------+------+
| Date| Store|MinSale|MaxSale|AvgSale|
+----------+-------+-------+-------+------+
|2021-01-02|Store A| 6| 8| 7.0|
|2021-01-02|Store B| 13| 18| 15.5|
|2021-01-01|Store B| 15| 20| 17.5|
|2021-01-01|Store A| 5| 10| 7.5|
+----------+-------+-------+-------+------+
我们可以看到,在每个分组中,最大值、最小值和平均值都已经被计算出来了。
如果默认提供的汇总方法无法满足我们的需求,我们也可以自定义汇总函数来对数据进行处理。下面我们进行一个例子,计算不同店铺销售额的增长率:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
def growth_rate(sales):
first_day_sales = sales[0]
last_day_sales = sales[-1]
return (last_day_sales - first_day_sales) / first_day_sales
growth_rate_udf = udf(growth_rate, DoubleType())
sales.groupBy('Store').agg(
growth_rate_udf(collect_list('Sale')).alias('GrowthRate')
).show()
输出结果如下:
+-------+------------------+
| Store| GrowthRate|
+-------+------------------+
|Store A|0.1090909090909091|
|Store B| 0.55|
+-------+------------------+
我们可以看到,每个店铺的销售额增长率都已经被计算出来了。
在分组前,我们可以先对需要分组的列进行类型转换。比如,我们可以将日期列的字符串类型转换为日期类型,以便更好地对数据进行处理。下面我们进行一个例子,将日期列从字符串类型转换为日期类型,然后按每个星期进行分组:
from pyspark.sql.functions import date_format, weekofyear
sales = sales.withColumn('Date', date_format('Date', 'yyyy-MM-dd').cast('date'))
sales.groupBy(weekofyear('Date')).agg(sum('Sale')).show()
输出结果如下:
+----------------+----------+
|weekofyear(Date)|sum(Sale) |
+----------------+----------+
| 1| 81|
+----------------+----------+
我们可以看到数据已经按每个星期进行了分组。注意,在此之前,我们将日期列从字符串类型转换为日期类型,并使用weekofyear函数获取了星期信息。这样我们就能够更好地对数据进行处理了。
本文介绍了PySpark Groupby的基本用法和一些高级用法。使用PySpark Groupby可以方便地对数据进行分组统计,是数据分析领域中不可或缺的工具之一。如果您需要对大量数据进行分组统计处理,建议尝试使用PySpark Groupby。