📜  pyspark 备忘单 (1)

📅  最后修改于: 2023-12-03 15:04:02.108000             🧑  作者: Mango

Pyspark 备忘单

简介

Pyspark是Apache Spark的Python接口,它允许使用Python创建和获取计算资源,以及操作集群上的数据。本备忘单包含一些Pyspark中常用的代码片段,希望能在日常使用中提供帮助。

初始化

要使用Pyspark,需要在代码中首先初始化SparkContext。以下是通常使用的一个示例:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("myApp").setMaster("local")
sc = SparkContext(conf=conf)

这里指定了Spark应用的名称“myApp”,并将master设置为“local”,表示在本地模式(使用单个线程)下启动Spark。

创建RDD

Pyspark中最常见的数据结构是弹性分布式数据集(Resilient Distributed Datasets,简称RDD)。以下是一些常见的创建RDD的方法。

  1. 从本地数据集创建RDD
#从文本文件创建RDD
rdd = sc.textFile("path/to/your/file.txt")

#从Python列表创建RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
  1. 通过转换已有RDD创建新的RDD
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd1.map(lambda x: x * 2)

上面的代码将创建一个包含5个元素的RDD,并对其进行map操作,生成一个新的RDD,其中每个元素都乘以2。

RDD 操作
  1. 广播变量

广播变量是一种分布式共享变量,它允许Spark程序在集群上的各个节点上缓存只读变量,从而提高性能。以下是一个示例:

# 创建一个广播变量
broadcastVar = sc.broadcast([1, 2, 3, 4, 5])

# 在RDD上使用广播变量
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.map(lambda x: x + broadcastVar.value[0])
  1. 累加器

累加器是一种分布式计数器,允许Spark程序在集群上的各个节点上更新只写变量,通常用于计算RDD中的元素数量或元素的总和等聚合操作。以下是一个示例:

# 创建一个计数器
counter = sc.accumulator(0)

# 用RDD更新计数器
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x: counter.add(x))
  1. 转换操作

Pyspark中有许多常见的转换操作可以应用于RDD。

# map: 对RDD中的每个元素应用一个函数
rdd.map(lambda x: x * 2)

# filter: 返回满足条件的RDD
rdd.filter(lambda x: x > 3)

# flatMap: 对每个元素应用一个函数,并将结果展平为单个列表
rdd.flatMap(lambda x: [x, x + 1])

# groupBy: 按照指定的关键字对RDD进行分组
rdd.groupBy(lambda x: x % 2)

# reduceByKey: 按照指定的关键字对RDD进行分组,并将每组中的值聚合为单个值(通常使用lambda表达式作为聚合函数)
rdd.reduceByKey(lambda x, y: x + y)
  1. 动作操作

Pyspark中的动作操作处理RDD中的数据以返回结果。

# collect: 将整个RDD收集到驱动程序中
rdd.collect()

# take: 返回RDD中的前n个元素
rdd.take(3)

# count: 返回RDD中元素的数量
rdd.count()

# countByKey: 按照指定的关键字对RDD进行分组,并统计每组中的元素数量
rdd.countByKey()
常用函数

Pyspark中还有许多常用的函数可以用于数据操作和转换,例如:

# 将字符串转换为整数
int("123")

# 将字符串转换为小数
float("3.14")

# 将字符串转换为布尔值
bool("True")

# 对所有元素执行排序操作,返回一个新的RDD
rdd.sortBy(lambda x: x, ascending=False)

# 将两个RDD组合成一个新的RDD
rdd1.union(rdd2)

# 将RDD中的所有元素放入一个列表中
rdd.collect()

# 返回RDD中的前n个元素
rdd.take(n)

# 获取RDD中的第一个元素
rdd.first()

# 随机采样RDD中的一些元素
rdd.sample(withReplacement=False, fraction=0.5)

以上是Pyspark常用的函数,更多的函数可以在Pyspark官方文档中找到。

结论

本备忘单提供了Pyspark的一些常见用法和代码片段,包括RDD创建、广播变量和累加器、转换和动作操作等。希望这些代码片段能帮助开发人员在Pyspark中更快速、高效地开发Spark程序。