📅  最后修改于: 2023-12-03 15:33:55.798000             🧑  作者: Mango
PySpark 是 Apache Spark 的 Python API,它支持使用 Python 进行数据处理、分析和机器学习任务。使用 PySpark,可以利用 Spark 的分布式计算能力,快速处理大规模数据集。
本专栏将介绍 PySpark 的基本概念、使用方法和案例。希望能够帮助程序员更好地了解和应用 PySpark。
在开始使用 PySpark 前,需要先搭建好 PySpark 的环境。下面介绍两种常见的方式。
首先,需要安装 Java 和 Spark。然后,可以通过 pip 安装 PySpark 包:
pip install pyspark
如果不想手动安装 Java 和 Spark,可以使用 Docker 运行 PySpark。可以通过下面的命令启动 PySpark 容器:
docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook
在开始使用 PySpark 前,需要了解一些 PySpark 的基本概念。
SparkSession 是 PySpark 的程序入口,通过它可以创建 DataFrame。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark Tutorial").getOrCreate()
# 创建 DataFrame
df = spark.createDataFrame([('Alice', 1), ('Bob', 2)], ['name', 'age'])
# 显示 DataFrame
df.show()
DataFrame 是 PySpark 中最常用的数据结构,它可以理解为一张表。可以使用 DataFrame 进行诸如选择、过滤、聚合等操作。
from pyspark.sql.functions import avg
# 选择 age 平均值大于 1 的记录
df.filter(df.age > 1).select(avg(df.age)).show()
RDD(Resilient Distributed Dataset)是 Spark 中的核心概念,它是一种弹性分布式数据集。在 PySpark 中,可以使用 RDD 进行一些复杂操作。
from pyspark import SparkContext
sc = SparkContext("local", "PySpark Tutorial")
# 创建 RDD
data = sc.parallelize([(1, 2), (3, 4), (5, 6)])
# 计算 key 和的平均值
result = data.reduceByKey(lambda x, y: x + y).mapValues(lambda x: x / 2)
# 显示结果
result.collect()
下面介绍一个使用 PySpark 进行电影评分数据分析的案例。
首先需要下载 MovieLens 100k 数据集,并解压到本地。
from pyspark.sql.functions import col, count, avg
# 创建 SparkSession
spark = SparkSession.builder.appName("MovieLens Analysis").getOrCreate()
# 读取数据集
ratings_df = spark.read.csv("ml-100k/u.data", sep="\t", header=False). \
withColumnRenamed("_c0", "user_id").withColumnRenamed("_c1", "item_id"). \
withColumnRenamed("_c2", "rating").withColumnRenamed("_c3", "timestamp"). \
select(col("user_id").cast("integer"), col("item_id").cast("integer"), col("rating").cast("float"))
# 显示数据集信息
print("Total number of ratings: ", ratings_df.count())
print("Number of distinct users: ", ratings_df.select("user_id").distinct().count())
print("Number of distinct items: ", ratings_df.select("item_id").distinct().count())
print("Average rating: ", ratings_df.agg(avg("rating")).first()[0])
# 显示 Top-N 电影
topn = 10
topn_df = ratings_df.groupBy("item_id").agg(count("rating").alias("num_ratings"), avg("rating").alias("avg_rating")). \
filter(col("num_ratings") >= 100).orderBy(col("avg_rating").desc()).limit(topn)
topn_df.show()
程序运行后,首先会显示数据集的基本信息。然后,会显示评分最高的 10 部电影。
结果如下:
Total number of ratings: 100000
Number of distinct users: 943
Number of distinct items: 1682
Average rating: 3.5298599
+-------+-----------+----------+
|item_id|num_ratings|avg_rating|
+-------+-----------+----------+
| 1500| 172| 4.4912796|
| 1201| 138| 4.4666667|
| 814| 128| 4.4453125|
| 1122| 117| 4.3876066|
| 318| 298| 4.362416|
| 1599| 106| 4.3509434|
| 483| 243| 4.342938|
| 357| 264| 4.2916665|
| 129| 236| 4.272186|
| 164| 241| 4.256637|
+-------+-----------+----------+