📜  pyspark 新专栏 - Python (1)

📅  最后修改于: 2023-12-03 15:33:55.798000             🧑  作者: Mango

PySpark 新专栏 - Python

简介

PySpark 是 Apache Spark 的 Python API,它支持使用 Python 进行数据处理、分析和机器学习任务。使用 PySpark,可以利用 Spark 的分布式计算能力,快速处理大规模数据集。

本专栏将介绍 PySpark 的基本概念、使用方法和案例。希望能够帮助程序员更好地了解和应用 PySpark。

环境搭建

在开始使用 PySpark 前,需要先搭建好 PySpark 的环境。下面介绍两种常见的方式。

安装 PySpark 包

首先,需要安装 Java 和 Spark。然后,可以通过 pip 安装 PySpark 包:

pip install pyspark
使用 Docker 运行 PySpark

如果不想手动安装 Java 和 Spark,可以使用 Docker 运行 PySpark。可以通过下面的命令启动 PySpark 容器:

docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook
PySpark 基本概念

在开始使用 PySpark 前,需要了解一些 PySpark 的基本概念。

SparkSession

SparkSession 是 PySpark 的程序入口,通过它可以创建 DataFrame。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark Tutorial").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Alice', 1), ('Bob', 2)], ['name', 'age'])

# 显示 DataFrame
df.show()
DataFrame

DataFrame 是 PySpark 中最常用的数据结构,它可以理解为一张表。可以使用 DataFrame 进行诸如选择、过滤、聚合等操作。

from pyspark.sql.functions import avg

# 选择 age 平均值大于 1 的记录
df.filter(df.age > 1).select(avg(df.age)).show()
RDD

RDD(Resilient Distributed Dataset)是 Spark 中的核心概念,它是一种弹性分布式数据集。在 PySpark 中,可以使用 RDD 进行一些复杂操作。

from pyspark import SparkContext

sc = SparkContext("local", "PySpark Tutorial")

# 创建 RDD
data = sc.parallelize([(1, 2), (3, 4), (5, 6)])

# 计算 key 和的平均值
result = data.reduceByKey(lambda x, y: x + y).mapValues(lambda x: x / 2)

# 显示结果
result.collect()
PySpark 案例

下面介绍一个使用 PySpark 进行电影评分数据分析的案例。

数据准备

首先需要下载 MovieLens 100k 数据集,并解压到本地。

数据分析
from pyspark.sql.functions import col, count, avg

# 创建 SparkSession
spark = SparkSession.builder.appName("MovieLens Analysis").getOrCreate()

# 读取数据集
ratings_df = spark.read.csv("ml-100k/u.data", sep="\t", header=False). \
    withColumnRenamed("_c0", "user_id").withColumnRenamed("_c1", "item_id"). \
    withColumnRenamed("_c2", "rating").withColumnRenamed("_c3", "timestamp"). \
    select(col("user_id").cast("integer"), col("item_id").cast("integer"), col("rating").cast("float"))

# 显示数据集信息
print("Total number of ratings: ", ratings_df.count())
print("Number of distinct users: ", ratings_df.select("user_id").distinct().count())
print("Number of distinct items: ", ratings_df.select("item_id").distinct().count())
print("Average rating: ", ratings_df.agg(avg("rating")).first()[0])

# 显示 Top-N 电影
topn = 10
topn_df = ratings_df.groupBy("item_id").agg(count("rating").alias("num_ratings"), avg("rating").alias("avg_rating")). \
    filter(col("num_ratings") >= 100).orderBy(col("avg_rating").desc()).limit(topn)
topn_df.show()
结果展示

程序运行后,首先会显示数据集的基本信息。然后,会显示评分最高的 10 部电影。

结果如下:

Total number of ratings:  100000
Number of distinct users:  943
Number of distinct items:  1682
Average rating:  3.5298599
+-------+-----------+----------+
|item_id|num_ratings|avg_rating|
+-------+-----------+----------+
|   1500|        172| 4.4912796|
|   1201|        138| 4.4666667|
|    814|        128| 4.4453125|
|   1122|        117| 4.3876066|
|    318|        298|  4.362416|
|   1599|        106| 4.3509434|
|    483|        243|  4.342938|
|    357|        264| 4.2916665|
|    129|        236|  4.272186|
|    164|        241|  4.256637|
+-------+-----------+----------+