📜  PySpark 中的 GroupBy 和过滤数据

📅  最后修改于: 2022-05-13 01:55:48.023000             🧑  作者: Mango

PySpark 中的 GroupBy 和过滤数据

在本文中,我们将使用Python对 PySpark 中的数据进行分组和过滤。

让我们创建用于演示的数据框:

Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
# display
dataframe.show()


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
# Groupby with DEPT  with sum()
# to get FEE greater than 56700
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).filter(
  col('Total Fee') >= 56700).show()


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
# Groupby with DEPT  with sum()
# to get FEE greater than or equal to 
# 56700 and less than or equal to 100000
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).filter(
    col('Total Fee') >= 56700).filter(
  col('Total Fee') <= 100000).show()


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# Groupby with DEPT  with sum() to get
# FEE greater than or equal to  56700
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).where(
  col('Total Fee') >= 56700).show()


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# Groupby with DEPT  with sum() to get
# FEE greater than or equal to  56700
# and less than or equal to 100000
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).where(
    col('Total Fee') >= 56700).where(
  col('Total Fee') <= 100000).show()


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import functions
from pyspark.sql import functions as f
  
# import window module
from pyspark.sql import Window
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# display
dataframe.withColumn('FEE max', f.max('FEE').over(
    Window.partitionBy('DEPT'))).where(
  f.col('FEE') == f.col('FEE max')).show()


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import functions
from pyspark.sql import functions as f
  
# import window module
from pyspark.sql import Window
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# display
dataframe.join(dataframe.groupBy('DEPT').agg(
    f.max('FEE').alias('FEE')), on='FEE', 
               how='leftsemi').show()


输出:

在 PySpark 中,groupBy() 用于将相同的数据收集到 PySpark DataFrame 上的组中,并对分组数据执行聚合函数。在使用该方法时,我们必须使用 groupby 中的任何一个功能

过滤数据意味着根据条件删除一些数据。在 PySpark 中,我们可以使用 filter() 和 where()函数进行过滤

方法一:使用 filter()

这用于根据条件过滤数据帧并返回结果数据帧

示例 1:通过使用 sum() 获得大于或等于 56700 的 FEE 来过滤数据

Python3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
# Groupby with DEPT  with sum()
# to get FEE greater than 56700
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).filter(
  col('Total Fee') >= 56700).show()

输出:

示例 2:具有多个条件的过滤器

Python3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
# Groupby with DEPT  with sum()
# to get FEE greater than or equal to 
# 56700 and less than or equal to 100000
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).filter(
    col('Total Fee') >= 56700).filter(
  col('Total Fee') <= 100000).show()

输出:

方法二:使用 where()

这用于根据条件选择数据帧并返回结果数据帧

示例 1:通过使用 sum() 获得大于或等于 56700 的 FEE 来过滤数据

Python3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# Groupby with DEPT  with sum() to get
# FEE greater than or equal to  56700
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).where(
  col('Total Fee') >= 56700).show()

输出:

示例 2:具有多个条件的过滤器

Python3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import col
from pyspark.sql.functions import col, sum
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# Groupby with DEPT  with sum() to get
# FEE greater than or equal to  56700
# and less than or equal to 100000
dataframe.groupBy('DEPT').agg(sum(
  'FEE').alias("Total Fee")).where(
    col('Total Fee') >= 56700).where(
  col('Total Fee') <= 100000).show()

输出:

方法三:使用窗口函数

窗口函数用于对数据框中的列进行分区

我们可以对包含组值的数据列进行分区,然后使用 min()、max 等聚合函数来获取数据。这样,我们将使用 where 子句过滤来自 PySpark DataFrame 的数据。

示例:PySpark 程序仅过滤来自所有部门的数据框中的最大行

Python3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import functions
from pyspark.sql import functions as f
  
# import window module
from pyspark.sql import Window
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# display
dataframe.withColumn('FEE max', f.max('FEE').over(
    Window.partitionBy('DEPT'))).where(
  f.col('FEE') == f.col('FEE max')).show()

输出:

方法四:使用连接

我们可以使用 leftsemi join 过滤聚合操作的数据,这个 join 将返回 dataframe1 中的左匹配数据和聚合操作

示例:过滤来自所有部门的最高费用数据

Python3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
#import functions
from pyspark.sql import functions as f
  
# import window module
from pyspark.sql import Window
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of student  data
data = [["1", "sravan", "IT", 45000],
        ["2", "ojaswi", "CS", 85000],
        ["3", "rohith", "CS", 41000],
        ["4", "sridevi", "IT", 56000],
        ["5", "bobby", "ECE", 45000],
        ["6", "gayatri", "ECE", 49000],
        ["7", "gnanesh", "CS", 45000],
        ["8", "bhanu", "Mech", 21000]
        ]
  
# specify column names
columns = ['ID', 'NAME', 'DEPT', 'FEE']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
  
# display
dataframe.join(dataframe.groupBy('DEPT').agg(
    f.max('FEE').alias('FEE')), on='FEE', 
               how='leftsemi').show()

输出: