📅  最后修改于: 2023-12-03 14:50:15.640000             🧑  作者: Mango
Pyspark是Python API,它与Apache Spark交互,可以通过pyspark创建Spark DataFrame。数据框是一种下一代二维表,它支持大规模数据处理和复杂的数据分析。
在创建DataFrame之前,必须首先创建SparkSession对象。SparkSession是与Spark集群通信的关键入口点,可以帮助您管理您的应用程序上下文和Spark执行环境。
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("CreatingDataFrame").getOrCreate()
可以使用以下两种方法创建数据框:
您可以使用从RDD创建DataFrame的方式,以下是基本语法:
from pyspark.sql import Row
# create an RDD
rdd = spark.sparkContext.parallelize([(1, "John"), (2, "Bob"), (3, "Alice")])
# convert RDD to a DataFrame
df = rdd.map(lambda x: Row(id=x[0], name=x[1])).toDF()
# show the DataFrame
df.show()
+---+-----+
| id| name|
+---+-----+
| 1| John|
| 2| Bob|
| 3|Alice|
+---+-----+
在上面的示例中,先创建了一个RDD,然后使用map和toDF转换为DataFrame。可以看到,结果DataFrame拥有两列(对应于RDD中的两个数据项),始终将该列视为字符串类型。
您可以使用schema显式定义列名和列数据类型来创建DataFrame。以下是基本语法:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# create a schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
# create a DataFrame
df = spark.createDataFrame([(1, "John"), (2, "Bob"), (3, "Alice")], schema)
# show the DataFrame
df.show()
+---+-----+
| id| name|
+---+-----+
| 1| John|
| 2| Bob|
| 3|Alice|
+---+-----+
在上面的示例中,首先定义了一个schema,该schema定义了两列:“id”和“name”的名称和类型。然后将数据集和schema传递给createDataFrame方法。
DataFrame API提供了各种转换和操作方法,例如选择,过滤和聚合。以下是一些使用DataFrame API的示例:
# select columns
df.select("name").show()
+-----+
| name|
+-----+
| John|
| Bob|
|Alice|
+-----+
# filter rows
df.filter(df["id"] > 1).show()
+---+-----+
| id| name|
+---+-----+
| 2| Bob|
| 3|Alice|
+---+-----+
# group by and count
df.groupBy("name").count().show()
+-----+-----+
| name|count|
+-----+-----+
|Alice| 1|
| John| 1|
| Bob| 1|
+-----+-----+
这是使用pyspark创建DataFrame的基本介绍。DataFrame是大规模数据集的最佳选择,可以大大提高数据的处理和分析速度。Pyspark提供了强大的API,让您可以轻松地对DataFrame进行转换和操作,并处理大规模数据集。