使用 StructField 和 StructType 定义 DataFrame Schema
在本文中,我们将学习如何使用 StructField 和 StructType 定义 DataFrame Schema。
- StructType 和 StructFields 用于为 Dataframe 定义架构或其部分。这定义了每列的名称、数据类型和可为空标志。
- StructType 对象是 StructFields 对象的集合。它是包含 StructField 列表的内置数据类型。
Syntax:
- pyspark.sql.types.StructType(fields=None)
- pyspark.sql.types.StructField(name, datatype,nullable=True)
Parameter:
- fields – List of StructField.
- name – Name of the column.
- datatype – type of data i.e, Integer, String, Float etc.
- nullable – whether fields are NULL/None or not.
为了定义模式,我们必须使用 StructType() 对象,我们必须在其中定义或传递 StructField(),其中包含列的名称、列的数据类型和可空标志。我们可以写:-
schema = StructType([StructField(column_name1,datatype(),nullable_flag),
StructField(column_name2,datatype(),nullable_flag),
StructField(column_name3,datatype(),nullable_flag)
])
示例 1:使用 StructType 和 StructField 定义具有架构的 DataFrame。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [("Refrigerator", 112345, 4.0, 12499),
("LED TV", 114567, 4.2, 49999),
("Washing Machine", 113465, 3.9, 69999),
("T-shirt", 124378, 4.1, 1999),
("Jeans", 126754, 3.7, 3999),
("Running Shoes", 134565, 4.7, 1499),
("Face Mask", 145234, 4.6, 999)]
# defining schema for the dataframe with
# StructType and StructField
schm = StructType([
StructField("Product Name", StringType(), True),
StructField("Product ID", LongType(), True),
StructField("Rating", FloatType(), True),
StructField("Product Price", IntegerType(), True),
])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# visualizing dataframe and it's schema
df.printSchema()
df.show()
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [(("Refrigerator", 112345), 4.0, 12499),
(("LED TV", 114567), 4.2, 49999),
(("Washing Machine", 113465), 3.9, 69999),
(("T-shirt", 124378), 4.1, 1999),
(("Jeans", 126754), 3.7, 3999),
(("Running Shoes", 134565), 4.7, 1499),
(("Face Mask", 145234), 4.6, 999)]
# defining schema for the dataframe using
# nested StructType
schm = StructType([
StructField('Product', StructType([
StructField('Product Name', StringType(), True),
StructField('Product ID', LongType(), True),
])),
StructField('Rating', FloatType(), True),
StructField('Price', IntegerType(), True)])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# visualizing dataframe and it's schema
df.printSchema()
df.show(truncate=False)
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, struct, when
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [("Refrigerator", 112345, 4.0, 12499),
("LED TV", 114567, 4.2, 49999),
("Washing Machine", 113465, 3.9, 69999),
("T-shirt", 124378, 4.1, 1999),
("Jeans", 126754, 3.7, 3999),
("Running Shoes", 134565, 4.7, 1499),
("Face Mask", 145234, 4.6, 999)]
# defining schema for the dataframe using
# nested StructType
schm = StructType([
StructField("Product Name", StringType(), True),
StructField("Product ID", LongType(), True),
StructField("Rating", FloatType(), True),
StructField("Product Price", IntegerType(), True)])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# copying the columns to the new struct
# Product
new_df = df.withColumn("Product",
struct(col("Product Name").alias("Name"),
col("Product ID").alias("ID"),
col("Rating").alias("Rating"),
col("Product Price").alias("Price")))
# adding new column according to the given
# condition
new_df = new_df.withColumn("Product Range",
when(col("Product Price").cast(
IntegerType()) < 1000, "Low")
.when(col("Product Price").cast(IntegerType()
) < 7000, "Medium")
.otherwise("High"))
# dropping the columns as all column values
# are copied in Product column
new_df = new_df.drop("Product Name", "Product ID",
"Rating", "Product Price")
# visualizing dataframe and it's schema
new_df.printSchema()
new_df.show(truncate=False)
Python
# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import json
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [("Refrigerator", 4.0),
("LED TV", 4.2),
("Washing Machine", 3.9),
("T-shirt", 4.1)
]
# defining schema for the dataframe with
# StructType and StructField
schm = T.StructType([
T.StructField("Product Name", T.StringType(), True),
T.StructField("Rating", T.FloatType(), True)
])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# visualizing dataframe and it's schema
print("Original Dataframe:-")
df.printSchema()
df.show()
print("-------------------------------------------")
print("Schema in json format:-")
# storing schema in json format using
# schema.json() function
schma = df.schema.json()
print(schma)
# loading the json format schema
schm1 = StructType.fromJson(json.loads(schma))
# creating dataframe using json format schema
json_df = spark.createDataFrame(
spark.sparkContext.parallelize(input_data), schm1)
print("-------------------------------------------")
print("Dataframe using json schema:-")
# showing the created dataframe from json format
# schema printing the schema of created dataframe
json_df.printSchema()
json_df.show()
Python
# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
# Data containing the Array and Map- key,value pair
input_data = [
("Alex", 'Buttler', ["Mathematics", "Computer Science"],
{"Mathematics": 'Physics', "Chemistry": "Biology"}),
("Sam", "Samson", ["Chemistry, Biology"],
{"Chemistry": 'Physics', "Mathematics": "Biology"}),
("Rossi", "Bryant", ["English", "Geography"],
{"History": 'Geography', "Chemistry": "Biology"}),
("Sidz", "Murraz", ["History", "Environmental Science"],
{"English": 'Environmental Science', "Chemistry": "Mathematics"}),
("Robert", "Cox", ["Physics", "English"],
{"Computer Science": 'Environmental Science', "Chemistry": "Geography"})
]
# defining schema with ArrayType and MapType()
# using StructType() and StructField()
array_schm = StructType([
StructField('Firstname', StringType(), True),
StructField('Lastname', StringType(), True),
StructField('Subject', ArrayType(StringType()), True),
StructField('Subject Combinations', MapType(
StringType(), StringType()), True)
])
# calling function for creating the dataframe
df = create_df(spark, input_data, array_schm)
# printing schema of df and showing dataframe
df.printSchema()
df.show(truncate=False)
输出:
在上面的代码中,我们使可为空标志=True。将其设为 True 的用途是,如果在创建 Dataframe 时任何字段值为 NULL/None,则 Dataframe 也将创建为无值。
示例 2:使用嵌套的 StructType 定义 Dataframe 架构。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [(("Refrigerator", 112345), 4.0, 12499),
(("LED TV", 114567), 4.2, 49999),
(("Washing Machine", 113465), 3.9, 69999),
(("T-shirt", 124378), 4.1, 1999),
(("Jeans", 126754), 3.7, 3999),
(("Running Shoes", 134565), 4.7, 1499),
(("Face Mask", 145234), 4.6, 999)]
# defining schema for the dataframe using
# nested StructType
schm = StructType([
StructField('Product', StructType([
StructField('Product Name', StringType(), True),
StructField('Product ID', LongType(), True),
])),
StructField('Rating', FloatType(), True),
StructField('Price', IntegerType(), True)])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# visualizing dataframe and it's schema
df.printSchema()
df.show(truncate=False)
输出:
示例 3:使用 PySpark 列类更改数据框的结构并添加新列。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, struct, when
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [("Refrigerator", 112345, 4.0, 12499),
("LED TV", 114567, 4.2, 49999),
("Washing Machine", 113465, 3.9, 69999),
("T-shirt", 124378, 4.1, 1999),
("Jeans", 126754, 3.7, 3999),
("Running Shoes", 134565, 4.7, 1499),
("Face Mask", 145234, 4.6, 999)]
# defining schema for the dataframe using
# nested StructType
schm = StructType([
StructField("Product Name", StringType(), True),
StructField("Product ID", LongType(), True),
StructField("Rating", FloatType(), True),
StructField("Product Price", IntegerType(), True)])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# copying the columns to the new struct
# Product
new_df = df.withColumn("Product",
struct(col("Product Name").alias("Name"),
col("Product ID").alias("ID"),
col("Rating").alias("Rating"),
col("Product Price").alias("Price")))
# adding new column according to the given
# condition
new_df = new_df.withColumn("Product Range",
when(col("Product Price").cast(
IntegerType()) < 1000, "Low")
.when(col("Product Price").cast(IntegerType()
) < 7000, "Medium")
.otherwise("High"))
# dropping the columns as all column values
# are copied in Product column
new_df = new_df.drop("Product Name", "Product ID",
"Rating", "Product Price")
# visualizing dataframe and it's schema
new_df.printSchema()
new_df.show(truncate=False)
输出:
- 在上面的示例中,我们使用 struct()函数更改 Dataframe 的结构,并将列复制到新的 struct 'Product' 中,并使用 withColumn()函数创建 Product 列。
- 将“产品名称”、“产品 ID”、“评级”、“产品价格”复制到新的结构“产品”后。
- 我们正在使用 withColumn()函数添加新列“价格范围”,根据给定的条件分为三类,即低、中和高。如果“产品价格”小于 1000,则该产品属于低类别,如果“产品价格”小于 7000,则该产品属于中类别,否则该产品属于高类别。
- 创建新的结构“产品”并添加新列“价格范围”后,我们必须使用 drop()函数删除“产品名称”、“产品 ID”、“评级”、“产品价格”列。然后使用更改后的 Dataframe 结构和添加的列打印模式。
示例 4:使用JSON 格式和 StructType()定义数据帧架构。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import json
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
input_data = [("Refrigerator", 4.0),
("LED TV", 4.2),
("Washing Machine", 3.9),
("T-shirt", 4.1)
]
# defining schema for the dataframe with
# StructType and StructField
schm = T.StructType([
T.StructField("Product Name", T.StringType(), True),
T.StructField("Rating", T.FloatType(), True)
])
# calling function to create dataframe
df = create_df(spark, input_data, schm)
# visualizing dataframe and it's schema
print("Original Dataframe:-")
df.printSchema()
df.show()
print("-------------------------------------------")
print("Schema in json format:-")
# storing schema in json format using
# schema.json() function
schma = df.schema.json()
print(schma)
# loading the json format schema
schm1 = StructType.fromJson(json.loads(schma))
# creating dataframe using json format schema
json_df = spark.createDataFrame(
spark.sparkContext.parallelize(input_data), schm1)
print("-------------------------------------------")
print("Dataframe using json schema:-")
# showing the created dataframe from json format
# schema printing the schema of created dataframe
json_df.printSchema()
json_df.show()
输出:
注意:您也可以将JSON格式存储在文件中并使用该文件来定义模式,代码也与上面相同,只是您需要在loads()函数传递JSON文件,在上面的例子中, JSON 格式的架构存储在一个变量中,我们使用该变量来定义架构。
示例 5:使用 StructType() 和 ArrayType() 和 MapType() 定义数据帧架构。
Python
# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
# function to create SparkSession
def create_session():
spk = SparkSession.builder \
.master("local") \
.appName("Product_mart.com") \
.getOrCreate()
return spk
# function to create dataframe
def create_df(spark, data, schema):
df1 = spark.createDataFrame(data, schema)
return df1
if __name__ == "__main__":
# calling function to create SparkSession
spark = create_session()
# Data containing the Array and Map- key,value pair
input_data = [
("Alex", 'Buttler', ["Mathematics", "Computer Science"],
{"Mathematics": 'Physics', "Chemistry": "Biology"}),
("Sam", "Samson", ["Chemistry, Biology"],
{"Chemistry": 'Physics', "Mathematics": "Biology"}),
("Rossi", "Bryant", ["English", "Geography"],
{"History": 'Geography', "Chemistry": "Biology"}),
("Sidz", "Murraz", ["History", "Environmental Science"],
{"English": 'Environmental Science', "Chemistry": "Mathematics"}),
("Robert", "Cox", ["Physics", "English"],
{"Computer Science": 'Environmental Science', "Chemistry": "Geography"})
]
# defining schema with ArrayType and MapType()
# using StructType() and StructField()
array_schm = StructType([
StructField('Firstname', StringType(), True),
StructField('Lastname', StringType(), True),
StructField('Subject', ArrayType(StringType()), True),
StructField('Subject Combinations', MapType(
StringType(), StringType()), True)
])
# calling function for creating the dataframe
df = create_df(spark, input_data, array_schm)
# printing schema of df and showing dataframe
df.printSchema()
df.show(truncate=False)
输出: