📅  最后修改于: 2020-12-02 05:52:19             🧑  作者: Mango
使用spark-submit的Spark应用程序是一个Shell命令,用于在集群上部署Spark应用程序。它通过统一的界面使用所有各自的集群管理器。因此,您不必为每个应用程序都配置您的应用程序。
让我们以以前使用shell命令的单词计数为例。在这里,我们考虑与Spark应用程序相同的示例。
以下文本是输入数据,名为in.txt的文件。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
看下面的程序-
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
将上述程序保存到名为SparkWordCount.scala的文件中,并将其放在用户定义的目录spark-application中。
注意-在将inputRDD转换为countRDD时,我们使用flatMap()将行(从文本文件中)标记为单词,使用map()方法对单词频率进行计数,并使用reduceByKey()方法对每个单词重复进行计数。
使用以下步骤提交此申请。通过终端执行spark-application目录中的所有步骤。
编译需要Spark核心jar,因此,请从以下链接Spark核心jar下载spark-core_2.10-1.3.0.jar并将jar文件从下载目录移至spark-application目录。
使用下面给出的命令编译以上程序。该命令应从spark-application目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar是从Spark库获取的Hadoop支持jar。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
使用以下命令创建spark应用程序的jar文件。在此, wordcount是jar文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
使用以下命令提交spark应用程序-
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,您将在下面找到输出。以下输出中的OK(确定)命令用于用户识别,这是程序的最后一行。如果您仔细阅读以下输出,将会发现其他内容,例如-
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
程序成功执行后,您将在spark-application目录中找到名为outfile的目录。
以下命令用于打开和检查outfile目录中的文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
用于检查part-00000文件中的输出的命令是-
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
用于检查part-00001文件中的输出的命令是-
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
浏览以下部分以了解有关“ spark-submit”命令的更多信息。
spark-submit [options] [app arguments]
S.No | Option | Description |
---|---|---|
1 | –master | spark://host:port, mesos://host:port, yarn, or local. |
2 | –deploy-mode | Whether to launch the driver program locally (“client”) or on one of the worker machines inside the cluster (“cluster”) (Default: client). |
3 | –class | Your application’s main class (for Java / Scala apps). |
4 | –name | A name of your application. |
5 | –jars | Comma-separated list of local jars to include on the driver and executor classpaths. |
6 | –packages | Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. |
7 | –repositories | Comma-separated list of additional remote repositories to search for the maven coordinates given with –packages. |
8 | –py-files | Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps. |
9 | –files | Comma-separated list of files to be placed in the working directory of each executor. |
10 | –conf (prop=val) | Arbitrary Spark configuration property. |
11 | –properties-file | Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults. |
12 | –driver-memory | Memory for driver (e.g. 1000M, 2G) (Default: 512M). |
13 | –driver-java-options | Extra Java options to pass to the driver. |
14 | –driver-library-path | Extra library path entries to pass to the driver. |
15 | –driver-class-path |
Extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath. |
16 | –executor-memory | Memory per executor (e.g. 1000M, 2G) (Default: 1G). |
17 | –proxy-user | User to impersonate when submitting the application. |
18 | –help, -h | Show this help message and exit. |
19 | –verbose, -v | Print additional debug output. |
20 | –version | Print the version of current Spark. |
21 | –driver-cores NUM | Cores for driver (Default: 1). |
22 | –supervise | If given, restarts the driver on failure. |
23 | –kill | If given, kills the driver specified. |
24 | –status | If given, requests the status of the driver specified. |
25 | –total-executor-cores | Total cores for all executors. |
26 | –executor-cores | Number of cores per executor. (Default : 1 in YARN mode, or all available cores on the worker in standalone mode). |