📜  Apache Spark-部署

📅  最后修改于: 2020-12-02 05:52:19             🧑  作者: Mango


使用spark-submit的Spark应用程序是一个Shell命令,用于在集群上部署Spark应用程序。它通过统一的界面使用所有各自的集群管理器。因此,您不必为每个应用程序都配置您的应用程序。

让我们以以前使用shell命令的单词计数为例。在这里,我们考虑与Spark应用程序相同的示例。

样本输入

以下文本是输入数据,名为in.txt的文件。

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

看下面的程序-

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
        
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
        
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
} 

将上述程序保存到名为SparkWordCount.scala的文件中,并将其放在用户定义的目录spark-application中

注意-在将inputRDD转换为countRDD时,我们使用flatMap()将行(从文本文件中)标记为单词,使用map()方法对单词频率进行计数,并使用reduceByKey()方法对每个单词重复进行计数。

使用以下步骤提交此申请。通过终端执行spark-application目录中的所有步骤。

步骤1:下载Spark Ja

编译需要Spark核心jar,因此,请从以下链接Spark核心jar下载spark-core_2.10-1.3.0.jar并将jar文件从下载目录移至spark-application目录。

步骤2:编译程序

使用下面给出的命令编译以上程序。该命令应从spark-application目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar是从Spark库获取的Hadoop支持jar。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

第3步:创建一个JAR

使用以下命令创建spark应用程序的jar文件。在此, wordcount是jar文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

步骤4:提交Spark申请

使用以下命令提交spark应用程序-

spark-submit --class SparkWordCount --master local wordcount.jar

如果执行成功,您将在下面找到输出。以下输出中的OK(确定)命令用于用户识别,这是程序的最后一行。如果您仔细阅读以下输出,将会发现其他内容,例如-

  • 在端口42954上成功启动了服务“ sparkDriver”
  • MemoryStore开始时的容量为267.3 MB
  • 在http://192.168.1.217:4040启动SparkUI
  • 添加了JAR文件:/home/hadoop/piapplication/count.jar
  • ResultStage 1(SparkPi.scala:saveAsTextFile:11)在0.566秒内完成
  • 在http://192.168.1.217:4040停止了Spark Web UI
  • MemoryStore已清除
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!  

步骤5:检查输出

程序成功执行后,您将在spark-application目录中找到名为outfile的目录。

以下命令用于打开和检查outfile目录中的文件列表。

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

用于检查part-00000文件中的输出的命令是-

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

用于检查part-00001文件中的输出的命令是-

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

浏览以下部分以了解有关“ spark-submit”命令的更多信息。

Spark提交语法

spark-submit [options]  [app arguments]

选件

S.No Option Description
1 –master spark://host:port, mesos://host:port, yarn, or local.
2 –deploy-mode Whether to launch the driver program locally (“client”) or on one of the worker machines inside the cluster (“cluster”) (Default: client).
3 –class Your application’s main class (for Java / Scala apps).
4 –name A name of your application.
5 –jars Comma-separated list of local jars to include on the driver and executor classpaths.
6 –packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths.
7 –repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with –packages.
8 –py-files Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps.
9 –files Comma-separated list of files to be placed in the working directory of each executor.
10 –conf (prop=val) Arbitrary Spark configuration property.
11 –properties-file Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.
12 –driver-memory Memory for driver (e.g. 1000M, 2G) (Default: 512M).
13 –driver-java-options Extra Java options to pass to the driver.
14 –driver-library-path Extra library path entries to pass to the driver.
15 –driver-class-path

Extra class path entries to pass to the driver.

Note that jars added with –jars are automatically included in the classpath.

16 –executor-memory Memory per executor (e.g. 1000M, 2G) (Default: 1G).
17 –proxy-user User to impersonate when submitting the application.
18 –help, -h Show this help message and exit.
19 –verbose, -v Print additional debug output.
20 –version Print the version of current Spark.
21 –driver-cores NUM Cores for driver (Default: 1).
22 –supervise If given, restarts the driver on failure.
23 –kill If given, kills the driver specified.
24 –status If given, requests the status of the driver specified.
25 –total-executor-cores Total cores for all executors.
26 –executor-cores Number of cores per executor. (Default : 1 in YARN mode, or all available cores on the worker in standalone mode).