📌  相关文章
📜  如何迭代 PySpark 数据框中的行和列

📅  最后修改于: 2022-05-13 01:55:41.330000             🧑  作者: Mango

如何迭代 PySpark 数据框中的行和列

在本文中,我们将讨论如何在 PySpark 数据帧中迭代行和列。

创建用于演示的数据框:

Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
dataframe.show()


Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# using collect
for i in dataframe.collect():
    # display
    print(i["ID"], i["NAME"], i["Company"])


Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# using toLocalIterator()
for i in dataframe.rdd.toLocalIterator():
   
    # display
    print(i["ID"], i["NAME"], i["Company"])


Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# using iterrows()
for index, row in dataframe.toPandas().iterrows():
    # display with index
    print(row[0], row[1], row[2])


Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# select only id and company
for rows in dataframe.select("ID", "Name").collect():
       # display
    print(rows[0], rows[1])


Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# select name column
for i in [j["NAME"] for j in dataframe.rdd.collect()]:
    print(i)


Python3
# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# select id and name column using map()
rdd = dataframe.rdd.map(lambda loop: (
    loop["ID"], loop["NAME"]))
 
# convert to dataframe and display
rdd.toDF(["ID", "NAME"]).collect()


输出:

方法一:使用collect()

此方法将收集数据帧的所有行和列,然后使用 for 循环遍历它。这里迭代器用于使用 collect() 方法从收集的元素中迭代循环。

语法

for itertator in dataframe.collect():
                    print(itertator["column_name"],...............)

在哪里,

  • 数据框是输入数据框
  • 迭代器用于收集行
  • column_name 是要迭代行的列

示例:这里我们将使用 collect() 方法迭代数据框中的所有列,在 for 循环中,我们指定 iterator['column_name'] 来获取列值。

Python3

# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# using collect
for i in dataframe.collect():
    # display
    print(i["ID"], i["NAME"], i["Company"])

输出:

方法二:使用 toLocalIterator()

它将返回包含 RDD 中所有行和列的迭代器。它类似于 collect() 方法,但它是 rdd 格式,所以它在 rdd 方法内部可用。我们可以将 toLocalIterator() 与 rdd 一起使用,例如:

dataframe.rdd.toLocalIterator()

为了迭代所有行和列,我们在 for 循环中迭代它

语法

for itertator in dataframe.rdd.toLocalIterator():
                    print(itertator["column_name"],...............)

在哪里,

  • 数据框是输入数据框
  • 迭代器用于收集行
  • column_name 是要迭代行的列

示例:在这里,我们将使用 toLocalIterator() 方法迭代数据框中的所有列,在 for 循环中,我们指定 iterator['column_name'] 来获取列值。

Python3

# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# using toLocalIterator()
for i in dataframe.rdd.toLocalIterator():
   
    # display
    print(i["ID"], i["NAME"], i["Company"])

输出:

方法 3:使用 iterrows()

这将迭代行。在此之前,我们必须使用 toPandas() 方法将 PySpark 数据帧转换为 Pandas 数据帧。此方法用于在数据框中逐行迭代。

示例:在此示例中,我们将使用 for 循环使用 iterrows() 迭代三列行。

Python3

# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# using iterrows()
for index, row in dataframe.toPandas().iterrows():
    # display with index
    print(row[0], row[1], row[2])

输出:

方法 4:使用 select()

select()函数用于选择列数。然后我们使用 collect()函数通过 for 循环获取行。

select 方法将选择提到的列并使用 collect() 方法获取行数据。此方法将从给定列中收集行。

示例:在这里,我们将使用 select() 方法从给定的数据框中选择 ID 和 Name 列

Python3

# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# select only id and company
for rows in dataframe.select("ID", "Name").collect():
       # display
    print(rows[0], rows[1])

输出:

方法 5:使用列表推导

这将作为一个循环来获取每一行,最后我们可以使用 for 循环来获取特定的列,我们将使用 collect() 方法通过 rdd 迭代给定列中的数据。

示例:这里我们将迭代 NAME 列中的行。

Python3

# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# select name column
for i in [j["NAME"] for j in dataframe.rdd.collect()]:
    print(i)

输出:

sravan
ojaswi
rohith
sridevi
bobby

方法 6:使用 map()

在这个方法中,我们将使用 map()函数,它从给定的数据帧或 RDD 返回一个新的 v。 map()函数与 lambda函数一起使用以遍历 pyspark 数据帧的每一行。

为了首先使用 map() 遍历每一行,我们必须将 PySpark 数据帧转换为 RDD,因为 map() 仅在 RDD 上执行,所以首先将其转换为 RDD,然后使用 map() 其中,lambda函数用于遍历每一行并将新的 RDD 存储在某个变量中,然后使用 toDF() 通过将模式传递给它,将新的 RDD 转换回 Dataframe。

句法:

rdd=dataframe.rdd.map(lambda loop: (
      loop["column1"],...,loop["columnn"]) )
 rdd.toDF(["column1",.......,"columnn"]).collect()

示例:这里我们将迭代 ID 和 NAME 列

Python3

# importing module
import pyspark
 
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
 
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
 
# list  of employee data
data = [["1", "sravan", "company 1"],
        ["2", "ojaswi", "company 1"],
        ["3", "rohith", "company 2"],
        ["4", "sridevi", "company 1"],
        ["5", "bobby", "company 1"]]
 
# specify column names
columns = ['ID', 'NAME', 'Company']
 
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
 
# select id and name column using map()
rdd = dataframe.rdd.map(lambda loop: (
    loop["ID"], loop["NAME"]))
 
# convert to dataframe and display
rdd.toDF(["ID", "NAME"]).collect()

输出:

[Row(ID='1', NAME='sravan'),
Row(ID='2', NAME='ojaswi'),
Row(ID='3', NAME='rohith'),
Row(ID='4', NAME='sridevi'),
Row(ID='5', NAME='bobby')]