📅  最后修改于: 2023-12-03 15:33:55.723000             🧑  作者: Mango
在 PySpark 中,合并两个具有不同列或模式的 DataFrame 是一项很常见的任务。本文将介绍如何执行此操作。
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 创建第一个 DataFrame
data1 = [(1, "a"), (2, "b"), (3, "c")]
schema1 = StructType([
StructField("id", IntegerType(), True),
StructField("value", StringType(), True)
])
df1 = spark.createDataFrame(data1, schema1)
# 创建第二个 DataFrame
data2 = [(4, "d", "foo"), (5, "e", "bar"), (6, "f", "baz")]
schema2 = StructType([
StructField("id", IntegerType(), True),
StructField("value", StringType(), True),
StructField("label", StringType(), True)
])
df2 = spark.createDataFrame(data2, schema2)
我们创建了两个 DataFrame,一个包含 id
和 value
两列,另一个包含 id
、value
和 label
三列。
如果两个 DataFrame 具有相同的列名,则我们可以使用 union
方法将它们合并起来,即使相应的列不同:
df_union = df1.union(df2)
df_union.show()
结果:
+---+-----+-----+
| id|value|label|
+---+-----+-----+
| 1| a| null|
| 2| b| null|
| 3| c| null|
| 4| d| foo|
| 5| e| bar|
| 6| f| baz|
+---+-----+-----+
在这个例子中,我们得到了一个具有三列的 DataFrame,其中来自第一个 DataFrame 的不匹配列 label
被填充为 null
。
当两个 DataFrame 具有不同的列名时,我们可以使用 select
、withColumn
和 union
方法来排列列,并将它们合并起来:
# 配置第二个 DataFrame 中缺失的列
df2_with_missing_cols = df2.withColumn("id_missing", lit(None).cast(IntegerType())) \
.withColumn("value_missing", lit(None).cast(StringType()))
# 排列列并合并 DataFrame
df_merged = df1.select("id", "value").union(df2_with_missing_cols.select("id", "value_missing", "id_missing", "label"))
df_merged.show()
结果:
+---+-----+-----------+-----+
| id|value|id_missing|label|
+---+-----+-----------+-----+
| 1| a| null| null|
| 2| b| null| null|
| 3| c| null| null|
| 4| null| 4| foo|
| 5| null| 5| bar|
| 6| null| 6| baz|
+---+-----+-----------+-----+
在这个例子中,我们首先使用 withColumn
方法为第二个 DataFrame 中缺失的列 id_missing
和 value_missing
创建了一个新列并填充为 null
,然后使用 select
方法和列名指定排列列的顺序,并使用 union
方法将两个 DataFrame 合并起来。
在 PySpark 中,合并两个具有不同列或模式的 DataFrame 是一项常见的任务。我们可以使用 union
方法,即使 DataFrame 包含不同的列,也可以轻松地将它们合并起来。如果DataFrame包含不同的列或我们需要特定的列顺序,则必须使用select
和withColumn
方法。
这是如何在 PySpark 中合并具有不同列或模式的两个 DataFrame。