如何检查 PySpark 中的某个东西是 RDD 还是 DataFrame?
在本文中,我们将使用 isinstance()、type() 和 dispatch 方法检查数据是 RDD 还是 DataFrame。
方法一、使用isinstance()方法
它用于检查特定数据是 RDD 还是数据帧。它返回布尔值。
Syntax: isinstance(data,DataFrame/RDD)
where
- data is our input data
- DataFrame is the method from pyspark.sql module
- RDD is the method from pyspark.sql module
检查我们的数据是否为数据帧的示例程序:
Python3
# importing module
import pyspark
#import DataFrame
from pyspark.sql import DataFrame
# importing sparksession
# from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession
# and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data = [[1, "sravan", "company 1"],
[2, "ojaswi", "company 1"],
[3, "rohith", "company 2"],
[4, "sridevi", "company 1"],
[1, "sravan", "company 1"],
[4, "sridevi", "company 1"]]
# specify column names
columns = ['ID', 'NAME', 'Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)
# check if it is dataframe or not
print(isinstance(dataframe, DataFrame))
Python3
# import DataFrame
from pyspark.sql import DataFrame
# import RDD
from pyspark.rdd import RDD
# need to import for session creation
from pyspark.sql import SparkSession
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create an rdd with some data
data = spark.sparkContext.parallelize([("1", "sravan", "vignan", 67, 89),
("2", "ojaswi", "vvit", 78, 89),
("3", "rohith", "vvit", 100, 80),
("4", "sridevi", "vignan", 78, 80),
("1", "sravan", "vignan", 89, 98),
("5", "gnanesh", "iit", 94, 98)])
# check the data is rdd or not
print(isinstance(data, RDD))
Python3
# import DataFrame
from pyspark.sql import DataFrame
# import RDD
from pyspark.rdd import RDD
# need to import for session creation
from pyspark.sql import SparkSession
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan", "vignan", 98),
(2, "bobby", "bsc", 87)])
# check if it is an RDD
print(" RDD : ", isinstance(rdd, RDD))
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
# display data of rdd
print("Rdd Data : \n", rdd.collect())
# convert rdd to dataframe
data = rdd.toDF()
# check if it is an RDD
print("RDD : ", isinstance(rdd, RDD))
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
# display dataframe
data.collect()
Python3
# need to import for session creation
from pyspark.sql import SparkSession
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan","vignan",98),
(2, "bobby","bsc",87)])
# check the type using type() command
print(type(rdd))
Python3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data =[[1,"sravan","company 1"],
[2,"ojaswi","company 1"],
[3,"rohith","company 2"],
[4,"sridevi","company 1"],
[1,"sravan","company 1"],
[4,"sridevi","company 1"]]
# specify column names
columns=['ID','NAME','Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
# check thet type of
# data with type() command
print(type(dataframe))
Python3
# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# import singledispatch
from functools import singledispatch
# import spark context
from pyspark import SparkContext
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
# creating sparksession
# and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# import DataFrame
# import RDD
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create a function to dispatch our function
@singledispatch
def check(x):
pass
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
return "RDD"
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
return "DataFrame"
# create an pyspark dataframe
# and check whether it is RDD or not
print(check(sc.parallelize([("1", "sravan", "vignan", 67, 89)])))
Python3
# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# import singledispatch
from functools import singledispatch
# import spark context
from pyspark import SparkContext
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create a function to dispatch our function
@singledispatch
def check(x):
pass
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
return "RDD"
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
return "DataFrame"
# create an pyspark dataframe and
# check whether it is dataframe or not
print(check(spark.createDataFrame([("1", "sravan",
"vignan", 67, 89)])))
输出:
True
检查数据是否为RDD:
通过使用 isinstance() 方法我们可以检查。
Syntax: isinstance(data,RDD)
where
- data is our input data
- RDDis the method from pyspark.sql module
例子:
蟒蛇3
# import DataFrame
from pyspark.sql import DataFrame
# import RDD
from pyspark.rdd import RDD
# need to import for session creation
from pyspark.sql import SparkSession
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create an rdd with some data
data = spark.sparkContext.parallelize([("1", "sravan", "vignan", 67, 89),
("2", "ojaswi", "vvit", 78, 89),
("3", "rohith", "vvit", 100, 80),
("4", "sridevi", "vignan", 78, 80),
("1", "sravan", "vignan", 89, 98),
("5", "gnanesh", "iit", 94, 98)])
# check the data is rdd or not
print(isinstance(data, RDD))
输出:
True
将 RDD 转换为 DataFrame 并检查类型
在这里,我们将创建一个 RDD 并使用 toDF() 方法将其转换为数据帧并检查数据。
蟒蛇3
# import DataFrame
from pyspark.sql import DataFrame
# import RDD
from pyspark.rdd import RDD
# need to import for session creation
from pyspark.sql import SparkSession
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan", "vignan", 98),
(2, "bobby", "bsc", 87)])
# check if it is an RDD
print(" RDD : ", isinstance(rdd, RDD))
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
# display data of rdd
print("Rdd Data : \n", rdd.collect())
# convert rdd to dataframe
data = rdd.toDF()
# check if it is an RDD
print("RDD : ", isinstance(rdd, RDD))
# check if it is an DataFrame
print("Dataframe : ", isinstance(rdd, DataFrame))
# display dataframe
data.collect()
输出:
方法二:使用type()函数
type() 命令用于返回给定对象的类型。
Syntax: type(data_object)
Here, dataobject is the rdd or dataframe data.
示例1 :使用RDD创建数据并检查类型的Python程序
蟒蛇3
# need to import for session creation
from pyspark.sql import SparkSession
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create an rdd with some data
rdd = spark.sparkContext.parallelize([(1, "Sravan","vignan",98),
(2, "bobby","bsc",87)])
# check the type using type() command
print(type(rdd))
输出:
示例 2:用于创建数据框并检查类型的Python程序。
蟒蛇3
# importing module
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# list of employee data
data =[[1,"sravan","company 1"],
[2,"ojaswi","company 1"],
[3,"rohith","company 2"],
[4,"sridevi","company 1"],
[1,"sravan","company 1"],
[4,"sridevi","company 1"]]
# specify column names
columns=['ID','NAME','Company']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
# check thet type of
# data with type() command
print(type(dataframe))
输出:
方法 3:使用 Dispatch
dispatch 装饰器创建一个带有函数名的 dispatcher 对象并存储这个对象,我们可以引用这个对象来做操作。这里我们创建了一个对象来检查我们的数据是 RDD 还是 DataFrame。所以我们使用单分派
示例 1: Python代码创建单个调度程序并传递数据并检查数据是否为 rdd
蟒蛇3
# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# import singledispatch
from functools import singledispatch
# import spark context
from pyspark import SparkContext
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
# creating sparksession
# and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# import DataFrame
# import RDD
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create a function to dispatch our function
@singledispatch
def check(x):
pass
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
return "RDD"
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
return "DataFrame"
# create an pyspark dataframe
# and check whether it is RDD or not
print(check(sc.parallelize([("1", "sravan", "vignan", 67, 89)])))
输出:
RDD
示例2: Python代码检查数据是否为dataframe
蟒蛇3
# importing module
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# import singledispatch
from functools import singledispatch
# import spark context
from pyspark import SparkContext
# createan object for spark
# context with local and name is GFG
sc = SparkContext("local", "GFG")
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# creating the spark session
spark = SparkSession.builder.getOrCreate()
# create a function to dispatch our function
@singledispatch
def check(x):
pass
# this function is for returning
# an RDD if the given input is RDD
@check.register(RDD)
def _(arg):
return "RDD"
# this function is for returning
# an RDD if the given input is DataFrame
@check.register(DataFrame)
def _(arg):
return "DataFrame"
# create an pyspark dataframe and
# check whether it is dataframe or not
print(check(spark.createDataFrame([("1", "sravan",
"vignan", 67, 89)])))
输出:
DataFrame