PySpark Collect() – 从 DataFrame 中检索数据
Collect() 是 RDD 或 Dataframe 的函数、操作,用于从 Dataframe 中检索数据。它用于从 RDD 中的每个分区检索行的所有元素并将其带到驱动程序节点/程序上。
因此,在本文中,我们将学习如何使用 collect() 操作操作从 Dataframe 中检索数据。
Syntax: df.collect()
Where df is the dataframe
示例 1:使用 collect() 从 Dataframe 中检索所有数据。
创建数据帧后,为了从数据帧中检索所有数据,我们通过编写df.collect()使用了 collect() 操作,这将返回行类型的数组,在下面的输出中显示了数据帧的架构和实际创建的数据框。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# printing schema of the dataframe and showing the dataframe
df.printSchema()
df.show()
# retrieving the data from the dataframe using collect()
df2= df.collect()
print("Retrieved Data is:-")
print(df2)
Python
# importing necessary libraries
from pyspark.sql import SparkSession
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# printing schema of the dataframe and showing the dataframe
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving data from 0th row
print(df.collect()[0][0:])
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# showing the dataframe and schema
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving multiple rows using collect() and for loop
for row in df.collect()[0:3]:
print((row["State"]),",",str(row["Cases"]),",",
str(row["Recovered"]),",",str(row["Deaths"]))
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# showing the dataframe and schema
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving data from the "Cases" column
for col in df.collect():
print(col["Cases"])
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# showing the dataframe and schema
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving data of the "State",
# "Recovered" and "Deaths" column
for col in df.collect():
print(col["State"],",",col["Recovered"],",
",col["Deaths"])
输出:
示例 2:使用 collect() 检索特定行的数据。
创建数据框后,我们通过分别编写print(df.collect()[0][0:])使用 collect() 操作检索第 0 行数据框的数据,在此我们在 collect() 之后传递行和列,在第一个打印语句中,我们将行和列作为 [0][0:] 传递,这里第一个 [0] 表示我们传递了 0 的行,第二个 [0:] 表示列和冒号 (:) 用于检索所有列,简而言之,我们检索了所有列元素的第 0 行。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# printing schema of the dataframe and showing the dataframe
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving data from 0th row
print(df.collect()[0][0:])
输出:
示例 3:使用 collect() 检索多行数据。
创建数据框后,我们使用 collect() 动作和 for 循环检索数据框前三行的数据,通过在 df.collect()[0:3] 中写入for row ,在写入 collect() 动作后我们正在传递我们想要的行数 [0:3],第一个 [0] 代表起始行并使用“:”分号,[3] 代表我们想要多行数据的结束行。
这是我们从中检索数据的行数是 0,1 和 2 最后一个索引总是被排除在外,即 3。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# showing the dataframe and schema
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving multiple rows using collect() and for loop
for row in df.collect()[0:3]:
print((row["State"]),",",str(row["Cases"]),",",
str(row["Recovered"]),",",str(row["Deaths"]))
输出:
示例 4:使用 collect() 从特定列中检索数据。
创建数据框后,我们使用 collect() 操作和 for 循环检索“案例”列的数据。通过将循环迭代到 df.collect(),这为我们提供了来自该行的行数组,我们正在通过编写print(col[“Cases”]);检索和打印 'Cases' 列的数据;
由于我们通过从 Array of rows 迭代 for 循环来获取第一个行,因此我们仅从该行检索“Cases”列的数据。通过在此处从每一行写入print(col[“Cases”]) ,我们通过在 col 中传递 'Cases' 来检索 'Cases' 列的数据。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# showing the dataframe and schema
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving data from the "Cases" column
for col in df.collect():
print(col["Cases"])
输出:
示例 5:使用 collect() 从多列中检索数据。
创建数据框后,我们将检索多列的数据,其中包括“State”、“Recovered”和“Deaths”。
为了检索多列的数据,首先我们必须获得我们使用df.collect()操作获得的行数组,现在迭代数组的每一行的 for 循环,因为通过迭代我们正在逐行获取行,所以从我们正在从每一列中检索“State”、“Recovered”和“Deaths”列的数据并通过写入打印数据, print(col[“State”],”,”,col[“Recovered”], ”,”,col[“死亡人数”])
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# function to create new SparkSession
def create_session():
spk = SparkSession.builder \
.appName("Corona_cases_statewise.com") \
.getOrCreate()
return spk
# function to create RDD
def create_RDD(sc_obj,data):
df = sc.parallelize(data)
return df
if __name__ == "__main__":
input_data = [("Uttar Pradesh",122000,89600,12238),
("Maharashtra",454000,380000,67985),
("Tamil Nadu",115000,102000,13933),
("Karnataka",147000,111000,15306),
("Kerala",153000,124000,5259)]
# calling function to create SparkSession
spark = create_session()
# creating spark context object
sc = spark.sparkContext
# calling function to create RDD
rd_df = create_RDD(sc,input_data)
schema_lst = ["State","Cases","Recovered","Deaths"]
# creating the dataframe using createDataFrame function
df = spark.createDataFrame(rd_df,schema_lst)
# showing the dataframe and schema
df.printSchema()
df.show()
print("Retrieved Data is:-")
# Retrieving data of the "State",
# "Recovered" and "Deaths" column
for col in df.collect():
print(col["State"],",",col["Recovered"],",
",col["Deaths"])
输出: