📜  在 PySpark 中合并两个数据帧

📅  最后修改于: 2022-05-13 01:54:45.855000             🧑  作者: Mango

在 PySpark 中合并两个数据帧

在本文中,我们将学习如何在 PySpark 中按行合并多个数据帧。外部链接联合这是对 DataFrame 执行此操作的唯一方法。使用的模块是 pyspark :

Spark(Apache 的开源大数据处理引擎)是一个集群计算系统。与其他集群计算系统(如 Hadoop)相比,它速度更快。它提供了Python、Scala 和Java的高级 API。并行作业在 Spark 中很容易编写。我们将介绍PySpark (Python + Apache Spark),因为这将使学习曲线更平坦。要在 linux 系统上安装 Spark,请按照此操作。要在多集群系统中运行 Spark,请按照此操作

为了完成我们的任务,我们为所有输入数据帧定义了一个递归调用的函数,并将其一个一个地结合起来。为了联合,我们使用 pyspark 模块:

  • Dataframe union()DataFrame 的 union()方法用于混合具有等效结构/模式的两个 DataFrame。如果模式不等价,则返回错误。
  • DataFrame unionAll() – unionAll() 自 Spark “2.0.0” 版本起已弃用,并替换为 union()。

注意:在其他 SQL 中,Union 消除了重复项,但 UnionAll 合并了两个数据集,包括重复记录。但是,在 spark 中,它们的行为是等效的,并使用 DataFrame 重复函数来删除重复的行。

在最后一次调用时,它返回所需的结果数据帧。以下代码代表了我们对给定问题的解决方案背后的逻辑。



Python3
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
  
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
  
unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)


Python3
import functools
  
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)


Python3
# import modules
from pyspark.sql import SparkSession
import functools
  
# explicit function
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
  
  
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b'])
  
# different column order.
df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a'])
df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a'])
  
unioned_df = unionAll([df1, df2, df3])
unioned_df.show()


Python3
# import modules
from functools import reduce  
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
  
# explicit functions
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
  
  
spark = SparkSession.builder.getOrCreate()
  
df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b'])
  
# different column order.
df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a'])
df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a'])
  
unionAll(*[df1, df2, df3]).show()


发生的事情是它将您传递的所有对象作为参数并减少它们的使用 unionAll (此限制来自Python,不再是 Spark 最小化,即使它们的工作方式相似),迟早会将其减少为一个 DataFrame。

如果 DataFrames 是普通的 RDDs,你可以绕过它们的列表到你的 SparkContext 的联合功能

例子:

有时,当要组合的数据框没有相同的列顺序时,最好使用 df2.select(df1.columns) 以确保两个 df 在合并之前具有相同的列顺序。

蟒蛇3

import functools
  
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

reduce(fun,seq)函数用于将在其参数中传递的特定函数应用于传递的序列中提到的所有列表元素。该函数在functools模块中定义。

现在,让我们借助一些示例来了解整个过程。

示例 1:



在这个例子中,我们用一些随机值的列 'a' 和 'b' 创建数据帧,并将所有这三个数据帧传递给我们上面创建的方法 unionAll() 并将结果数据帧作为输出并显示结果。

蟒蛇3

# import modules
from pyspark.sql import SparkSession
import functools
  
# explicit function
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
  
  
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b'])
  
# different column order.
df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a'])
df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a'])
  
unioned_df = unionAll([df1, df2, df3])
unioned_df.show()

输出:

示例 2:

在这个例子中,我们创建了包含一些随机值的列 'a' 和 'b' 的数据帧,并将所有这三个数据帧传递给我们新创建的方法 unionAll(),在该方法中我们不关注列的名称。我们只是将输入数据帧合并到下一个数据帧,并将结果数据帧作为输出并显示结果。

蟒蛇3

# import modules
from functools import reduce  
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
  
# explicit functions
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
  
  
spark = SparkSession.builder.getOrCreate()
  
df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b'])
  
# different column order.
df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a'])
df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a'])
  
unionAll(*[df1, df2, df3]).show()

输出: