📜  PySpark – 将 CSV 文件读入 DataFrame

📅  最后修改于: 2022-05-13 01:55:02.561000             🧑  作者: Mango

PySpark – 将 CSV 文件读入 DataFrame

在本文中,我们将看到如何将 CSV 文件读入 Dataframe。为此,我们将使用 Pyspark 和Python .ala。

使用的文件:

  • 作者
  • 书作者
  • 图书

将 CSV 文件读入 DataFrame

在这里,我们将使用 spark.read.csv 将单个 CSV 读入数据帧,然后使用 .toPandas() 使用此数据创建数据帧。

Python3
from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName(
    'Read CSV File into DataFrame').getOrCreate()
  
authors = spark.read.csv('/content/authors.csv', sep=',',
                         inferSchema=True, header=True)
  
df = authors.toPandas()
df.head()


Python3
from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName('Read Multiple CSV Files').getOrCreate()
  
path = ['/content/authors.csv',
        '/content/book_author.csv']
  
files = spark.read.csv(path, sep=',',
                       inferSchema=True, header=True)
  
df1 = files.toPandas()
display(df1.head())
display(df1.tail())


Python3
from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName(
    'Read All CSV Files in Directory').getOrCreate()
  
f2 = spark.read.csv('/content/*.csv', sep=',', 
                    inferSchema=True, header=True)
  
df1 = file2.toPandas()
display(df1.head())
display(df1.tail())


输出:



在这里,我们传递了我们的 CSV 文件authors.csv。其次,我们传递了 CSV 文件中使用的分隔符。这里的分隔符是逗号', '。接下来,我们将inferSchema属性设置为True ,这将遍历 CSV 文件并自动将其架构调整为 PySpark Dataframe。然后,我们使用toPandas()方法将 PySpark Dataframe 转换为 Pandas Dataframe df

读取多个 CSV 文件

要读取多个 CSV 文件,我们将以字符串类型传递 CSV 文件路径的Python列表。

蟒蛇3

from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName('Read Multiple CSV Files').getOrCreate()
  
path = ['/content/authors.csv',
        '/content/book_author.csv']
  
files = spark.read.csv(path, sep=',',
                       inferSchema=True, header=True)
  
df1 = files.toPandas()
display(df1.head())
display(df1.tail())

输出:

在这里,我们导入了存在于同一当前工作目录中的authors.csv 和book_author.csv,分隔符为逗号' , ',第一行为Header。

读取目录中的所有 CSV 文件

要读取目录中的所有 CSV 文件,我们将使用*来考虑目录中的每个文件。

蟒蛇3

from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName(
    'Read All CSV Files in Directory').getOrCreate()
  
f2 = spark.read.csv('/content/*.csv', sep=',', 
                    inferSchema=True, header=True)
  
df1 = file2.toPandas()
display(df1.head())
display(df1.tail())

输出:

这将读取当前工作目录中存在的所有CSV文件,分隔符为逗号“ , ”,第一行为标题。