如何迭代 PySpark 数据框中的行和列
在本文中,我们将讨论如何在 PySpark 数据帧中迭代行和列。
创建用于演示的数据框:
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
dataframe.show()
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# using collect
for i in dataframe.collect():
# display
print(i["ID"], i["NAME"], i["Company"])
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# using toLocalIterator()
for i in dataframe.rdd.toLocalIterator():
# display
print(i["ID"], i["NAME"], i["Company"])
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# using iterrows()
for index, row in dataframe.toPandas().iterrows():
# display with index
print(row[0], row[1], row[2])
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# select only id and company
for rows in dataframe.select("ID", "Name").collect():
# display
print(rows[0], rows[1])
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# select name column
for i in [j["NAME"] for j in dataframe.rdd.collect()]:
print(i)
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# select id and name column using map()
rdd = dataframe.rdd.map(lambda loop: (
loop["ID"], loop["NAME"]))
# convert to dataframe and display
rdd.toDF(["ID", "NAME"]).collect()
输出:
方法一:使用collect()
此方法将收集数据帧的所有行和列,然后使用 for 循环遍历它。这里迭代器用于使用 collect() 方法从收集的元素中迭代循环。
语法:
for itertator in dataframe.collect():
print(itertator["column_name"],...............)
在哪里,
- 数据框是输入数据框
- 迭代器用于收集行
- column_name 是要迭代行的列
示例:这里我们将使用 collect() 方法迭代数据框中的所有列,在 for 循环中,我们指定 iterator['column_name'] 来获取列值。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# using collect
for i in dataframe.collect():
# display
print(i["ID"], i["NAME"], i["Company"])
输出:
方法二:使用 toLocalIterator()
它将返回包含 RDD 中所有行和列的迭代器。它类似于 collect() 方法,但它是 rdd 格式,所以它在 rdd 方法内部可用。我们可以将 toLocalIterator() 与 rdd 一起使用,例如:
dataframe.rdd.toLocalIterator()
为了迭代所有行和列,我们在 for 循环中迭代它
语法:
for itertator in dataframe.rdd.toLocalIterator():
print(itertator["column_name"],...............)
在哪里,
- 数据框是输入数据框
- 迭代器用于收集行
- column_name 是要迭代行的列
示例:在这里,我们将使用 toLocalIterator() 方法迭代数据框中的所有列,在 for 循环中,我们指定 iterator['column_name'] 来获取列值。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# using toLocalIterator()
for i in dataframe.rdd.toLocalIterator():
# display
print(i["ID"], i["NAME"], i["Company"])
输出:
方法 3:使用 iterrows()
这将迭代行。在此之前,我们必须使用 toPandas() 方法将 PySpark 数据帧转换为 Pandas 数据帧。此方法用于在数据框中逐行迭代。
Syntax: dataframe.toPandas().iterrows()
示例:在此示例中,我们将使用 for 循环使用 iterrows() 迭代三列行。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# using iterrows()
for index, row in dataframe.toPandas().iterrows():
# display with index
print(row[0], row[1], row[2])
输出:
方法 4:使用 select()
select()函数用于选择列数。然后我们使用 collect()函数通过 for 循环获取行。
select 方法将选择提到的列并使用 collect() 方法获取行数据。此方法将从给定列中收集行。
Syntax: dataframe.select(“column1″,…………,”column n”).collect()
示例:在这里,我们将使用 select() 方法从给定的数据框中选择 ID 和 Name 列
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# select only id and company
for rows in dataframe.select("ID", "Name").collect():
# display
print(rows[0], rows[1])
输出:
方法 5:使用列表推导
这将作为一个循环来获取每一行,最后我们可以使用 for 循环来获取特定的列,我们将使用 collect() 方法通过 rdd 迭代给定列中的数据。
Syntax: dataframe.rdd.collect()
示例:这里我们将迭代 NAME 列中的行。
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# select name column
for i in [j["NAME"] for j in dataframe.rdd.collect()]:
print(i)
输出:
sravan
ojaswi
rohith
sridevi
bobby
方法 6:使用 map()
在这个方法中,我们将使用 map()函数,它从给定的数据帧或 RDD 返回一个新的 v。 map()函数与 lambda函数一起使用以遍历 pyspark 数据帧的每一行。
为了首先使用 map() 遍历每一行,我们必须将 PySpark 数据帧转换为 RDD,因为 map() 仅在 RDD 上执行,所以首先将其转换为 RDD,然后使用 map() 其中,lambda函数用于遍历每一行并将新的 RDD 存储在某个变量中,然后使用 toDF() 通过将模式传递给它,将新的 RDD 转换回 Dataframe。
句法:
rdd=dataframe.rdd.map(lambda loop: (
loop["column1"],...,loop["columnn"]) )
rdd.toDF(["column1",.......,"columnn"]).collect()
示例:这里我们将迭代 ID 和 NAME 列
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [["1", "sravan", "company 1"],
["2", "ojaswi", "company 1"],
["3", "rohith", "company 2"],
["4", "sridevi", "company 1"],
["5", "bobby", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# select id and name column using map()
rdd = dataframe.rdd.map(lambda loop: (
loop["ID"], loop["NAME"]))
# convert to dataframe and display
rdd.toDF(["ID", "NAME"]).collect()
输出:
[Row(ID='1', NAME='sravan'),
Row(ID='2', NAME='ojaswi'),
Row(ID='3', NAME='rohith'),
Row(ID='4', NAME='sridevi'),
Row(ID='5', NAME='bobby')]