如何在两个逐行数据帧中切片 PySpark 数据帧?
在本文中,我们将学习如何将 PySpark DataFrame 按行分割成两个。对 DataFrame 进行切片是获取一个子集,其中包含从一个索引到另一个索引的所有行。
方法一:使用limit()和subtract()函数
在这种方法中,我们首先使用createDataFrame()制作一个带有预编码数据的 PySpark DataFrame。然后我们使用limit()函数从 DataFrame 中获取特定数量的行并将其存储在一个新变量中。极限函数的语法是:
Syntax : DataFrame.limit(num)
Returns : A DataFrame with num number of rows.
然后我们将使用subtract()函数从初始DataFrame 中获取剩余的行。减法函数的语法是:
Syntax : DataFrame1.subtract(DataFrame2)
Returns : A new DataFrame containing rows in DataFrame1 but not in DataFrame2.
Python
# Importing PySpark
import pyspark
from pyspark.sql import SparkSession
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
# DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# Getting the slices
# The first slice has 3 rows
df1 = df.limit(3)
# Getting the second slice by removing df1
# from df
df2 = df.subtract(df1)
# Printing the first slice
df1.show()
# Printing the second slice.
df2.show()
Python
# Importing PySpark
import pyspark
from pyspark.sql import SparkSession
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# the first slice has 20% of the rows
# the second slice has 80% of the rows
# the data in both slices is selected randomly
df1, df2 = df.randomSplit([0.20, 0.80])
# Showing the first slice
df1.show()
# Showing the second slice
df2.show()
Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# getting the list of Row objects
row_list = df.collect()
# Slicing the Python List
part1 = row_list[:1]
part2 = row_list[1:]
# Converting the slices to PySpark DataFrames
slice1 = Spark_Session.createDataFrame(part1)
slice2 = Spark_Session.createDataFrame(part2)
# Printing the first slice
print('First DataFrame')
slice1.show()
# Printing the second slice
print('Second DataFrame')
slice2.show()
Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
# DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# Converting DataFrame to pandas
pandas_df = df.toPandas()
# First DataFrame formed by slicing
df1 = pandas_df.iloc[:2]
# Second DataFrame formed by slicing
df2 = pandas_df.iloc[2:]
# Converting the slices to PySpark DataFrames
df1 = Spark_Session.createDataFrame(df1)
df2 = Spark_Session.createDataFrame(df2)
# Printing the first slice
print('First DataFrame')
df1.show()
# Printing the second slice
print('Second DataFrame')
df2.show()
输出:
方法二:使用 randomSplit()函数
在这个方法中,我们首先要使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用randomSplit()函数获取 DataFrame 的两个切片,同时指定将出现在两个切片中的行的分数。行被随机拆分。
Syntax : DataFrame.randomSplit(weights,seed)
Parameters :
- weights : list of double values according to which the DataFrame is split.
- seed : the seed for sampling. This parameter is optional.
Returns : List of split DataFrames
Python
# Importing PySpark
import pyspark
from pyspark.sql import SparkSession
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# the first slice has 20% of the rows
# the second slice has 80% of the rows
# the data in both slices is selected randomly
df1, df2 = df.randomSplit([0.20, 0.80])
# Showing the first slice
df1.show()
# Showing the second slice
df2.show()
输出:
方法 3:使用 collect()函数
在这种方法中,我们将首先使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用以下方法获取 DataFrame 的 Row 对象列表:
DataFrame.collect()
然后,我们将使用Python列表切片来获取两个行列表。最后,我们使用createDataFrame()将这两个行列表转换为 PySpark DataFrames。
Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
#DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# getting the list of Row objects
row_list = df.collect()
# Slicing the Python List
part1 = row_list[:1]
part2 = row_list[1:]
# Converting the slices to PySpark DataFrames
slice1 = Spark_Session.createDataFrame(part1)
slice2 = Spark_Session.createDataFrame(part2)
# Printing the first slice
print('First DataFrame')
slice1.show()
# Printing the second slice
print('Second DataFrame')
slice2.show()
输出:
方法 4:将 PySpark DataFrame 转换为 Pandas DataFrame 并使用 iloc[] 进行切片
在这种方法中,我们将首先使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用toPandas()将其转换为 Pandas DataFrame。然后我们使用iloc[]和语法对 DataFrame 进行切片:
DataFrame.iloc[start_index:end_index]
不包括 end_index 处的行。最后,我们将使用createDataFrame()将 DataFrame 切片转换为 PySpark DataFrame
Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
# Session Creation
Spark_Session = SparkSession.builder.appName(
'Spark Session'
).getOrCreate()
# Data filled in our DataFrame
rows = [['Lee Chong Wei', 69, 'Malaysia'],
['Lin Dan', 66, 'China'],
['Srikanth Kidambi', 9, 'India'],
['Kento Momota', 15, 'Japan']]
# Columns of our DataFrame
columns = ['Player', 'Titles', 'Country']
# DataFrame is created
df = Spark_Session.createDataFrame(rows, columns)
# Converting DataFrame to pandas
pandas_df = df.toPandas()
# First DataFrame formed by slicing
df1 = pandas_df.iloc[:2]
# Second DataFrame formed by slicing
df2 = pandas_df.iloc[2:]
# Converting the slices to PySpark DataFrames
df1 = Spark_Session.createDataFrame(df1)
df2 = Spark_Session.createDataFrame(df2)
# Printing the first slice
print('First DataFrame')
df1.show()
# Printing the second slice
print('Second DataFrame')
df2.show()
输出: