📌  相关文章
📜  如何检查 PySpark 中的某个东西是 RDD 还是 DataFrame?

📅  最后修改于: 2022-05-13 01:55:13.963000             🧑  作者: Mango

如何检查 PySpark 中的某个东西是 RDD 还是 DataFrame?

在本文中,我们将使用 isinstance()、type() 和 dispatch 方法检查数据是 RDD 还是 DataFrame。

方法一、使用isinstance()方法

它用于检查特定数据是 RDD 还是数据帧。它返回布尔值。

检查我们的数据是否为数据帧的示例程序:



Python3
# importing module
import pyspark
  
#import DataFrame
from pyspark.sql import DataFrame
  
# importing sparksession
# from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession
# and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data
data = [[1, "sravan", "company 1"],
        [2, "ojaswi", "company 1"],
        [3, "rohith", "company 2"],
        [4, "sridevi", "company 1"],
        [1, "sravan", "company 1"],
        [4, "sridevi", "company 1"]]
  
# specify column names
columns = ['ID', 'NAME', 'Company']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
  
# check if it is dataframe or not
print(isinstance(dataframe, DataFrame))


Python3
# import DataFrame
from pyspark.sql import DataFrame
  
# import RDD
from pyspark.rdd import RDD
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create an rdd with some data
data = spark.sparkContext.parallelize([("1", "sravan", "vignan", 67, 89),
                                       ("2", "ojaswi", "vvit", 78, 89),
                                       ("3", "rohith", "vvit", 100, 80),
                                       ("4", "sridevi", "vignan", 78, 80),
                                       ("1", "sravan", "vignan", 89, 98),
                                       ("5", "gnanesh", "iit", 94, 98)])
  
# check the data is  rdd or not
print(isinstance(data, RDD))


Python3
# import DataFrame
from pyspark.sql import DataFrame
  
# import RDD
from pyspark.rdd import RDD
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan", "vignan", 98),
                                      (2, "bobby", "bsc", 87)])
  
# check if it is an RDD
print(" RDD : ", isinstance(rdd, RDD))
  
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
  
# display data of rdd
print("Rdd Data : \n", rdd.collect())
  
# convert rdd to dataframe
data = rdd.toDF()
  
# check if it is an RDD
print("RDD : ", isinstance(rdd, RDD))
  
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
  
# display dataframe
data.collect()


Python3
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan","vignan",98),
                                      (2, "bobby","bsc",87)])
  
# check the type using type() command
print(type(rdd))


Python3
# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data 
data =[[1,"sravan","company 1"],
       [2,"ojaswi","company 1"],
       [3,"rohith","company 2"],
       [4,"sridevi","company 1"],
       [1,"sravan","company 1"], 
       [4,"sridevi","company 1"]]
  
# specify column names
columns=['ID','NAME','Company']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
  
# check thet type of
# data with type() command
print(type(dataframe))


Python3
# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# import singledispatch
from functools import singledispatch
  
# import spark context
from pyspark import SparkContext
  
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
  
# creating sparksession
# and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# import DataFrame
  
# import RDD
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create a function to dispatch our function
@singledispatch
def check(x):
    pass
  
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
    return "RDD"
  
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
    return "DataFrame"
  
# create an pyspark dataframe
# and check whether it is RDD or not
print(check(sc.parallelize([("1", "sravan", "vignan", 67, 89)])))


Python3
# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# import singledispatch
from functools import singledispatch
  
# import spark context
from pyspark import SparkContext
  
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create a function to dispatch our function
@singledispatch
def check(x):
    pass
  
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
    return "RDD"
  
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
    return "DataFrame"
  
# create an pyspark dataframe and
# check whether it is dataframe or not
print(check(spark.createDataFrame([("1", "sravan",
                                    "vignan", 67, 89)])))


输出:

True

检查数据是否为RDD:

通过使用 isinstance() 方法我们可以检查。

例子:

蟒蛇3

# import DataFrame
from pyspark.sql import DataFrame
  
# import RDD
from pyspark.rdd import RDD
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create an rdd with some data
data = spark.sparkContext.parallelize([("1", "sravan", "vignan", 67, 89),
                                       ("2", "ojaswi", "vvit", 78, 89),
                                       ("3", "rohith", "vvit", 100, 80),
                                       ("4", "sridevi", "vignan", 78, 80),
                                       ("1", "sravan", "vignan", 89, 98),
                                       ("5", "gnanesh", "iit", 94, 98)])
  
# check the data is  rdd or not
print(isinstance(data, RDD))

输出:



True

将 RDD 转换为 DataFrame 并检查类型

在这里,我们将创建一个 RDD 并使用 toDF() 方法将其转换为数据帧并检查数据。

蟒蛇3

# import DataFrame
from pyspark.sql import DataFrame
  
# import RDD
from pyspark.rdd import RDD
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan", "vignan", 98),
                                      (2, "bobby", "bsc", 87)])
  
# check if it is an RDD
print(" RDD : ", isinstance(rdd, RDD))
  
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
  
# display data of rdd
print("Rdd Data : \n", rdd.collect())
  
# convert rdd to dataframe
data = rdd.toDF()
  
# check if it is an RDD
print("RDD : ", isinstance(rdd, RDD))
  
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
  
# display dataframe
data.collect()

输出:

方法二:使用type()函数

type() 命令用于返回给定对象的类型。

示例1 :使用RDD创建数据并检查类型的Python程序

蟒蛇3

# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan","vignan",98),
                                      (2, "bobby","bsc",87)])
  
# check the type using type() command
print(type(rdd))

输出:



示例 2:用于创建数据框并检查类型的Python程序。

蟒蛇3

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data 
data =[[1,"sravan","company 1"],
       [2,"ojaswi","company 1"],
       [3,"rohith","company 2"],
       [4,"sridevi","company 1"],
       [1,"sravan","company 1"], 
       [4,"sridevi","company 1"]]
  
# specify column names
columns=['ID','NAME','Company']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
  
# check thet type of
# data with type() command
print(type(dataframe))

输出:

方法 3:使用 Dispatch

dispatch 装饰器创建一个带有函数名的 dispatcher 对象并存储这个对象,我们可以引用这个对象来做操作。这里我们创建了一个对象来检查我们的数据是 RDD 还是 DataFrame。所以我们使用单分派

示例 1: Python代码创建单个调度程序并传递数据并检查数据是否为 rdd

蟒蛇3

# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# import singledispatch
from functools import singledispatch
  
# import spark context
from pyspark import SparkContext
  
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
  
# creating sparksession
# and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# import DataFrame
  
# import RDD
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create a function to dispatch our function
@singledispatch
def check(x):
    pass
  
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
    return "RDD"
  
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
    return "DataFrame"
  
# create an pyspark dataframe
# and check whether it is RDD or not
print(check(sc.parallelize([("1", "sravan", "vignan", 67, 89)])))

输出:

RDD

示例2: Python代码检查数据是否为dataframe

蟒蛇3

# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# import singledispatch
from functools import singledispatch
  
# import spark context
from pyspark import SparkContext
  
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# creating the  spark session
spark = SparkSession.builder.getOrCreate()
  
# create a function to dispatch our function
@singledispatch
def check(x):
    pass
  
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
    return "RDD"
  
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
    return "DataFrame"
  
# create an pyspark dataframe and
# check whether it is dataframe or not
print(check(spark.createDataFrame([("1", "sravan",
                                    "vignan", 67, 89)])))

输出:

DataFrame