📜  将 PySpark RDD 转换为 DataFrame

📅  最后修改于: 2022-05-13 01:55:33.256000             🧑  作者: Mango

将 PySpark RDD 转换为 DataFrame

在本文中,我们将讨论如何在 PySpark 中将 RDD 转换为数据帧。有两种方法可以将 RDD 转换为数据帧。

  1. 使用 createDataframe(rdd, schema)
  2. 使用 toDF(模式)

但是在继续将 RDD 转换为 Dataframe 之前,让我们先创建一个 RDD

例子:

Python
# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .appName("Corona_cases_statewise.com") \
        .getOrCreate()
    return spk
  
# function to create RDD
def create_RDD(sc_obj, data):
    df = sc.parallelize(data)
    return df
  
  
if __name__ == "__main__":
  
    input_data = [("Uttar Pradesh", 122000, 89600, 12238),
                  ("Maharashtra", 454000, 380000, 67985),
                  ("Tamil Nadu", 115000, 102000, 13933),
                  ("Karnataka", 147000, 111000, 15306),
                  ("Kerala", 153000, 124000, 5259)]
  
    # calling function to create SparkSession
    spark = create_session()
  
    # creating spark context object
    sc = spark.sparkContext
  
    # calling function to create RDD
    rd_df = create_RDD(sc, input_data)
  
    # printing the type
    print(type(rd_df))


Python
# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
# function to convert RDD to dataframe
def RDD_to_df(spark,df,schema):
    
  # converting RDD to df using createDataframe()
  # in which we are passing RDD and schema of df
  df1 = spark.createDataFrame(df,schema)
  return df1
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # calling function to covert RDD to dataframe
  converted_df = RDD_to_df(spark,rd_df,schema_lst)
    
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()


Python
# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc,data):
  df = sc.parallelize(data)
  return df
  
# function to convert RDD to dataframe
def RDD_to_df(df,schema):
    
  # converting RDD to dataframe using toDF()
  # in which we are passing schema of df
  df = rd_df.toDF(schema)
  return df
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # calling function to covert RDD to dataframe
  converted_df = RDD_to_df(rd_df,schema_lst)
    
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()


输出:



方法 1:使用 createDataframe()函数。

创建 RDD 后,我们使用 createDataframe()函数将其转换为 Dataframe,在该函数中,我们传递了 RDD 并为 Dataframe 定义了架构。

句法:

spark.CreateDataFrame(rdd, schema)

Python

# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
# function to convert RDD to dataframe
def RDD_to_df(spark,df,schema):
    
  # converting RDD to df using createDataframe()
  # in which we are passing RDD and schema of df
  df1 = spark.createDataFrame(df,schema)
  return df1
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # calling function to covert RDD to dataframe
  converted_df = RDD_to_df(spark,rd_df,schema_lst)
    
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()

输出:

方法二:使用 toDF()函数。

创建 RDD 后,我们使用 toDF()函数将其转换为 Dataframe,在该函数中我们传递了 Dataframe 的定义模式。

句法:

df.toDF(schema)

Python

# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc,data):
  df = sc.parallelize(data)
  return df
  
# function to convert RDD to dataframe
def RDD_to_df(df,schema):
    
  # converting RDD to dataframe using toDF()
  # in which we are passing schema of df
  df = rd_df.toDF(schema)
  return df
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # calling function to covert RDD to dataframe
  converted_df = RDD_to_df(rd_df,schema_lst)
    
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()

输出: