创建 PySpark 数据帧
在本文中,我们将学习如何创建 PySpark DataFrame。 PySpark 应用程序从初始化SparkSession开始,它是 PySpark 的入口点,如下所示。
# SparkSession initialization
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
注意: PySpark shell 通过 pyspark 可执行文件,在变量 spark 中为用户自动创建会话。因此,您还将使用 shell 运行它。
创建 PySpark 数据帧
PySpark DataFrame 通常通过pyspark.sql.SparkSession.createDataFrame创建。我们将通过 pyspark.sql.SparkSession.createDataFrame 创建 PySpark DataFrame 的方法。 pyspark.sql.SparkSession.createDataFrame使用 schema 参数来指定 DataFrame 的架构。当它被省略时,PySpark 通过从数据中抽取样本来推断相应的模式。
句法
pyspark.sql.SparkSession.createDataFrame()
Parameters:
- dataRDD: An RDD of any kind of SQL data representation(e.g. Row, tuple, int, boolean, etc.), or list, or pandas.DataFrame.
- schema: A datatype string or a list of column names, default is None.
- samplingRatio: The sample ratio of rows used for inferring
- verifySchema: Verify data types of every row against schema. Enabled by default.
Returns: Dataframe
下面有不同的方法来创建 PySpark DataFrame:
从行清单创建 PySpark DataFrame
在给出的实现中,我们将使用行清单创建 pyspark 数据框。为此,我们为每一行中的每个变量(特征)提供值并添加到数据框对象中。完成此操作后,我们将显示数据框和模式。
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# schema creation by passing list
df = spark.createDataFrame([
Row(a=1, b=4., c='GFG1', d=date(2000, 8, 1),
e=datetime(2000, 8, 1, 12, 0)),
Row(a=2, b=8., c='GFG2', d=date(2000, 6, 2),
e=datetime(2000, 6, 2, 12, 0)),
Row(a=4, b=5., c='GFG3', d=date(2000, 5, 3),
e=datetime(2000, 5, 3, 12, 0))
])
# show table
df.show()
# show schema
df.printSchema()
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame with Ecplicit Schema
df = spark.createDataFrame([
(1, 4., 'GFG1', date(2000, 8, 1),
datetime(2000, 8, 1, 12, 0)),
(2, 8., 'GFG2', date(2000, 6, 2),
datetime(2000, 6, 2, 12, 0)),
(3, 5., 'GFG3', date(2000, 5, 3),
datetime(2000, 5, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
# show table
df.show()
# show schema
df.printSchema()
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
## PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [4., 8., 5.],
'c': ['GFG1', 'GFG2', 'GFG3'],
'd': [date(2000, 8, 1), date(2000, 6, 2),
date(2000, 5, 3)],
'e': [datetime(2000, 8, 1, 12, 0),
datetime(2000, 6, 2, 12, 0),
datetime(2000, 5, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
# show table
df.show()
# show schema
df.printSchema()
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# pyspark dataframe
rdd = spark.sparkContext.parallelize([
(1, 4., 'GFG1', date(2000, 8, 1), datetime(2000, 8, 1, 12, 0)),
(2, 8., 'GFG2', date(2000, 6, 2), datetime(2000, 6, 2, 12, 0)),
(3, 5., 'GFG3', date(2000, 5, 3), datetime(2000, 5, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
# show table
df.show()
# show schema
df.printSchema()
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.csv'))
df
# show table
df.show()
# show schema
df.printSchema()
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.txt', delimiter="\t"))
df
# show table
df.show()
# show schema
df.printSchema()
Python3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_json('data.json'))
df
# show table
df.show()
# show schema
df.printSchema()
输出:
使用显式架构创建 PySpark DataFrame
在给出的实现中,我们将使用显式模式创建 pyspark 数据帧。为此,我们在每一行中提供特征值,并将它们添加到具有变量(特征)模式的数据框对象中。完成此操作后,我们将显示数据框和模式。
蟒蛇3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame with Ecplicit Schema
df = spark.createDataFrame([
(1, 4., 'GFG1', date(2000, 8, 1),
datetime(2000, 8, 1, 12, 0)),
(2, 8., 'GFG2', date(2000, 6, 2),
datetime(2000, 6, 2, 12, 0)),
(3, 5., 'GFG3', date(2000, 5, 3),
datetime(2000, 5, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
# show table
df.show()
# show schema
df.printSchema()
输出:
使用 Pandas 从 DataFrame 创建 PySpark DataFrame
在给出的实现中,我们将使用 Pandas Dataframe 创建 pyspark 数据帧。为此,我们提供了每个特征的值列表,这些值代表每一行的该列的值,并将它们添加到数据帧中。完成此操作后,我们将显示数据框和模式。
蟒蛇3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
## PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [4., 8., 5.],
'c': ['GFG1', 'GFG2', 'GFG3'],
'd': [date(2000, 8, 1), date(2000, 6, 2),
date(2000, 5, 3)],
'e': [datetime(2000, 8, 1, 12, 0),
datetime(2000, 6, 2, 12, 0),
datetime(2000, 5, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
# show table
df.show()
# show schema
df.printSchema()
输出:
从 RDD 创建 PySpark DataFrame
在给出的实现中,我们将使用元组列表创建 pyspark 数据帧。为此,我们通过使用 parallelize() 方法提供每行中的特征值来创建 RDD,并将它们添加到具有变量(特征)模式的数据帧对象中。完成此操作后,我们将显示数据框和模式。
蟒蛇3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# pyspark dataframe
rdd = spark.sparkContext.parallelize([
(1, 4., 'GFG1', date(2000, 8, 1), datetime(2000, 8, 1, 12, 0)),
(2, 8., 'GFG2', date(2000, 6, 2), datetime(2000, 6, 2, 12, 0)),
(3, 5., 'GFG3', date(2000, 5, 3), datetime(2000, 5, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
# show table
df.show()
# show schema
df.printSchema()
输出:
从 CSV 创建 PySpark DataFrame
在给出的实现中,我们将使用 CSV 创建 pyspark 数据框。为此,我们正在打开 CSV 文件,将它们添加到数据框对象中。完成此操作后,我们将显示数据框和模式。
使用的CSV: train_dataset
蟒蛇3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.csv'))
df
# show table
df.show()
# show schema
df.printSchema()
输出:
从文本文件创建 PySpark DataFrame
在给出的实现中,我们将使用文本文件创建 pyspark 数据框。为此,我们正在打开具有制表符分隔值的文本文件,将它们添加到数据框对象中。完成此操作后,我们将显示数据框和模式。
使用的文件:
蟒蛇3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.txt', delimiter="\t"))
df
# show table
df.show()
# show schema
df.printSchema()
输出:
从 JSON 创建 PySpark DataFrame
在给出的实现中,我们将使用 JSON 创建 pyspark 数据帧。为此,我们打开 JSON 文件,将它们添加到数据帧对象中。完成此操作后,我们将显示数据框和模式。
使用的 JSON:
蟒蛇3
# Need to import to use date time
from datetime import datetime, date
# need to import for working with pandas
import pandas as pd
# need to import to use pyspark
from pyspark.sql import Row
# need to import for session creation
from pyspark.sql import SparkSession
# creating the session
spark = SparkSession.builder.getOrCreate()
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_json('data.json'))
df
# show table
df.show()
# show schema
df.printSchema()
输出:
所以这些都是Creating a PySpark DataFrame的方法。在上述程序中使用了以下数据集。