📜  如何在两个逐行数据帧中切片 PySpark 数据帧?

📅  最后修改于: 2022-05-13 01:54:33.044000             🧑  作者: Mango

如何在两个逐行数据帧中切片 PySpark 数据帧?

在本文中,我们将学习如何将 PySpark DataFrame 按行分割成两个。对 DataFrame 进行切片是获取一个子集,其中包含从一个索引到另一个索引的所有行。

方法一:使用limit()和subtract()函数

在这种方法中,我们首先使用createDataFrame()制作一个带有预编码数据的 PySpark DataFrame。然后我们使用limit()函数从 DataFrame 中获取特定数量的行并将其存储在一个新变量中。极限函数的语法是:

然后我们将使用subtract()函数从初始DataFrame 中获取剩余的行。减法函数的语法是:

Python
# Importing PySpark
import pyspark
from pyspark.sql import SparkSession
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
# DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# Getting the slices
# The first slice has 3 rows
df1 = df.limit(3)
  
# Getting the second slice by removing df1
# from df
df2 = df.subtract(df1)
  
# Printing the first slice
df1.show()
  
# Printing the second slice.
df2.show()


Python
# Importing PySpark
import pyspark
from pyspark.sql import SparkSession
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# the first slice has 20% of the rows
# the second slice has 80% of the rows
# the data in both slices is selected randomly
df1, df2 = df.randomSplit([0.20, 0.80])
  
# Showing the first slice
df1.show()
  
# Showing the second slice
df2.show()


Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# getting the list of Row objects
row_list = df.collect()
  
# Slicing the Python List
part1 = row_list[:1]
part2 = row_list[1:]
  
# Converting the slices to PySpark DataFrames
slice1 = Spark_Session.createDataFrame(part1)
slice2 = Spark_Session.createDataFrame(part2)
  
# Printing the first slice
print('First DataFrame')
slice1.show()
  
# Printing the second slice
print('Second DataFrame')
slice2.show()


Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
# DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# Converting DataFrame to pandas
pandas_df = df.toPandas()
  
# First DataFrame formed by slicing
df1 = pandas_df.iloc[:2]
  
# Second DataFrame formed by slicing
df2 = pandas_df.iloc[2:]
  
# Converting the slices to PySpark DataFrames
df1 = Spark_Session.createDataFrame(df1)
df2 = Spark_Session.createDataFrame(df2)
  
# Printing the first slice
print('First DataFrame')
df1.show()
  
# Printing the second slice
print('Second DataFrame')
df2.show()


输出:

方法二:使用 randomSplit()函数

在这个方法中,我们首先要使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用randomSplit()函数获取 DataFrame 的两个切片,同时指定将出现在两个切片中的行的分数。行被随机拆分

Python

# Importing PySpark
import pyspark
from pyspark.sql import SparkSession
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# the first slice has 20% of the rows
# the second slice has 80% of the rows
# the data in both slices is selected randomly
df1, df2 = df.randomSplit([0.20, 0.80])
  
# Showing the first slice
df1.show()
  
# Showing the second slice
df2.show()

输出:

方法 3:使用 collect()函数

在这种方法中,我们将首先使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用以下方法获取 DataFrame 的 Row 对象列表:

DataFrame.collect()

然后,我们将使用Python列表切片来获取两个行列表。最后,我们使用createDataFrame()将这两个行列表转换为 PySpark DataFrames。

Python

# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# getting the list of Row objects
row_list = df.collect()
  
# Slicing the Python List
part1 = row_list[:1]
part2 = row_list[1:]
  
# Converting the slices to PySpark DataFrames
slice1 = Spark_Session.createDataFrame(part1)
slice2 = Spark_Session.createDataFrame(part2)
  
# Printing the first slice
print('First DataFrame')
slice1.show()
  
# Printing the second slice
print('Second DataFrame')
slice2.show()

输出:

方法 4:将 PySpark DataFrame 转换为 Pandas DataFrame 并使用 iloc[] 进行切片

在这种方法中,我们将首先使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用toPandas()将其转换为 Pandas DataFrame。然后我们使用iloc[]和语法对 DataFrame 进行切片:

DataFrame.iloc[start_index:end_index]

包括 end_index 处的行。最后,我们将使用createDataFrame()将 DataFrame 切片转换为 PySpark DataFrame

Python

# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
  
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
  
  
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
        ['Lin Dan', 66, 'China'],
        ['Srikanth Kidambi', 9, 'India'],
        ['Kento Momota', 15, 'Japan']]
  
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
  
# DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
  
# Converting DataFrame to pandas
pandas_df = df.toPandas()
  
# First DataFrame formed by slicing
df1 = pandas_df.iloc[:2]
  
# Second DataFrame formed by slicing
df2 = pandas_df.iloc[2:]
  
# Converting the slices to PySpark DataFrames
df1 = Spark_Session.createDataFrame(df1)
df2 = Spark_Session.createDataFrame(df2)
  
# Printing the first slice
print('First DataFrame')
df1.show()
  
# Printing the second slice
print('Second DataFrame')
df2.show()

输出: