📜  Apache Spark-核心编程

📅  最后修改于: 2020-12-02 05:51:39             🧑  作者: Mango


Spark Core是整个项目的基础。它提供了分布式任务分配,调度和基本的I / O功能。 Spark使用称为RDD(弹性分布式数据集)的专用基础数据结构,该结构是跨机器分区的逻辑数据集合。 RDD可以通过两种方式创建:一种是通过引用外部存储系统中的数据集,第二种是通过对现有RDD进行转换(例如,映射,过滤器,化简,联接)。

RDD抽象是通过语言集成的API公开的。这简化了编程的复杂性,因为应用程序处理RDD的方式类似于处理本地数据集合。

火花壳

Spark提供了一个交互式外壳程序-一个强大的工具来交互式地分析数据。它支持Scala或Python语言。 Spark的主要抽象是称为“弹性分布式数据集(RDD)”的项目的分布式集合。可以从Hadoop输入格式(例如HDFS文件)或通过转换其他RDD创建RDD。

打开火花壳

以下命令用于打开Spark Shell。

$ spark-shell

创建简单的RDD

让我们从文本文件创建一个简单的RDD。使用以下命令创建一个简单的RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上面命令的输出是

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at :12

Spark RDD API引入了很少的转换和很少的操作来操纵RDD。

RDD转换

RDD转换返回指向新RDD的指针,并允许您在RDD之间创建依赖关系。依赖关系链中的每个RDD(依赖关系的字符串)都具有计算其数据的函数,并具有指向其父RDD的指针(依赖关系)。

Spark是懒惰的,因此除非您调用将触发作业创建和执行的某些转换或操作,否则将不会执行任何操作。请看下面的单词计数示例片段。

因此,RDD转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉Spark如何获取数据以及如何处理数据。

下面给出了RDD转换的列表。

S.No Transformations & Meaning
1

map(func)

Returns a new distributed dataset, formed by passing each element of the source through a function func.

2

filter(func)

Returns a new dataset formed by selecting those elements of the source on which func returns true.

3

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

4

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator ⇒ Iterator when running on an RDD of type T.

5

mapPartitionsWithIndex(func)

Similar to map Partitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) ⇒ Iterator when running on an RDD of type T.

6

sample(withReplacement, fraction, seed)

Sample a fraction of the data, with or without replacement, using a given random number generator seed.

7

union(otherDataset)

Returns a new dataset that contains the union of the elements in the source dataset and the argument.

8

intersection(otherDataset)

Returns a new RDD that contains the intersection of elements in the source dataset and the argument.

9

distinct([numTasks])

Returns a new dataset that contains the distinct elements of the source dataset.

10

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.

Note − If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.

11

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V, V) ⇒ V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different from the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

13

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the Boolean ascending argument.

14

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

15

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called group With.

16

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

17

pipe(command, [envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.

18

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

19

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

20

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

动作

下表列出了返回值的动作列表。

S.No Action & Meaning
1

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

2

collect()

Returns all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

3

count()

Returns the number of elements in the dataset.

4

first()

Returns the first element of the dataset (similar to take (1)).

5

take(n)

Returns an array with the first n elements of the dataset.

6

takeSample (withReplacement,num, [seed])

Returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

7

takeOrdered(n, [ordering])

Returns the first n elements of the RDD using either their natural order or a custom comparator.

8

saveAsTextFile(path)

Writes the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark calls toString on each element to convert it to a line of text in the file.

9

saveAsSequenceFile(path) (Java and Scala)

Writes the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

10

saveAsObjectFile(path) (Java and Scala)

Writes the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

11

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

12

foreach(func)

Runs a function func on each element of the dataset. This is usually, done for side effects such as updating an Accumulator or interacting with external storage systems.

Note − modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

用RDD编程

让我们借助示例来了解RDD编程中一些RDD转换和动作的实现。

考虑一个单词计数示例-它计算出现在文档中的每个单词。将以下文本视为输入,并将其另存为主目录中的input.txt文件。

input.txt-输入文件。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

请按照下面给出的步骤执行给定的示例。

打开Spark-Shell

以下命令用于打开火花壳。通常,使用Scala构建spark。因此,Spark程序在Scala环境中运行。

$ spark-shell

如果Spark Shell成功打开,则将找到以下输出。看输出“可作为SC火花上下文”的最后一行表示Spark容器是自动创建的火花上下文对象与名称SC。在开始程序的第一步之前,应创建SparkContext对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
        
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建一个RDD

首先,我们必须使用Spark-Scala API读取输入文件并创建一个RDD。

以下命令用于从给定位置读取文件。在这里,使用输入文件名创建新的RDD。在textFile(“”)方法中作为参数给出的String是输入文件名的绝对路径。但是,如果仅给出文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目的是计算文件中的单词数。创建一个平面地图,将每行分割成多个单词( flatMap(line⇒line.split(“”) )。

接下来,使用映射函数( map(word⇒(word,1) ),将每个单词读取为值为‘1’的键( = )。

最后,通过添加相似键的值( reduceByKey(_+ _) )来减少这些键。

以下命令用于执行字数逻辑。执行此操作后,您将找不到任何输出,因为这不是操作,而是转换。指向新的RDD或告知Spark如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前RDD

在使用RDD时,如果您想了解当前的RDD,请使用以下命令。它将向您显示有关当前RDD及其调试依赖关系的描述。

scala> counts.toDebugString

缓存转换

您可以使用其上的persist()或cache()方法将RDD标记为持久。第一次在操作中对其进行计算时,它将被保存在节点上的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

采取行动

应用动作(如存储所有转换)将结果生成文本文件。 saveAsTextFile(“”)方法的String参数是输出文件夹的绝对路径。尝试使用以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端以转到主目录(在另一个终端中执行spark)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

以下命令用于查看Part-00000文件的输出。

[hadoop@localhost output]$ cat part-00000

输出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

以下命令用于查看Part-00001文件的输出。

[hadoop@localhost output]$ cat part-00001 

输出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

联合国坚持储存

取消永久保留之前,如果要查看用于该应用程序的存储空间,请在浏览器中使用以下URL。

http://localhost:4040

您将看到以下屏幕,其中显示了用于应用程序的存储空间,这些存储空间在Spark Shell上运行。

储存空间

如果要取消永久保留特定RDD的存储空间,请使用以下命令。

Scala> counts.unpersist()

您将看到如下输出:

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at :14

要验证浏览器中的存储空间,请使用以下URL。

http://localhost:4040/

您将看到以下屏幕。它显示了用于应用程序的存储空间,这些存储空间在Spark Shell上运行。

应用储存空间