Running Spark Applications
Spark applications are similar to MapReduce “jobs.” Each application is a self-contained computation which runs some user-supplied code to compute a result. As with MapReduce jobs, Spark applications can make use of the resources of multiple nodes. Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.
Each application has a driver process which coordinates its execution. This process can run in the foreground (client mode) or in the background (cluster mode). Client mode is a little simpler, but cluster mode allows you to easily log out after starting a Spark application without terminating the application. For the client mode, the input file path must point to a local file.
Spark starts executors to perform computations. There may be many executors, distributed across the cluster, depending on the size of the job. After loading some of the executors, Spark attempts to match tasks to executors.
CDH 5.3 introduces a performance optimization (via SPARK-1767), which causes Spark to prefer RDDs which are already cached locally in HDFS. This is important enough that Spark will wait for the executors near these caches to be free for a short time.
... val sparkConf = new SparkConf().setAppName( "SparkHdfsLR") val inputPath = args(0) val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(sparkConf, InputFormatInfo.computePreferredLocations( Seq( new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) ...
/** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[ String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData }
- Standalone mode:
In standalone mode, Spark uses a Master daemon which coordinates the efforts of the Workers, which run the executors. Standalone mode is the default, but it cannot be used on secure clusters.
- YARN mode:
In YARN mode, the YARN ResourceManager performs the functions of the Spark Master. The functions of the Workers are performed by the YARN NodeManager daemons, which run the executors. YARN mode is slightly more complex to set up, but it supports security, and provides better integration with YARN’s cluster-wide resource management policies.
Multiple Spark applications can run at once. If you decide to run Spark on YARN, you can decide on an application-by-application basis whether to run in YARN client mode or cluster mode. When you run Spark in client mode, the driver process runs locally; in cluster mode, it runs remotely on an ApplicationMaster.
The following sections use a sample application, SparkPi, which is packaged with Spark and computes the value of Pi, to illustrate the three modes.
Configuration
The easiest way to configure Spark is by setting $SPARK_HOME/conf/spark-defaults.conf.
spark.master spark://mysparkmaster.cloudera.com:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs:///user/spark/eventlog # Set spark executor memory spark.executor.memory 2g spark.logConf trueIt is a good idea to put configuration keys that you want to use for every application into spark-defaults.conf. See Script for more information about configuration keys.
The Spark-Submit Script
To run spark-submit, you need a compiled Spark application JAR. The following sections use a sample JAR, SparkPi, which is packaged with Spark. It computes an approximation to the value of Pi.
Running SparkPi in Standalone Mode
spark-submit \ --class org.apache.spark.examples.SparkPi \ --deploy-mode client \ --master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10where version is, for example, 2.10-1.1.0-cdh5.2.0.
Arguments that come after the JAR name are supplied to the application. In this case, the argument controls how good we want our approximation to Pi to be.
Running SparkPi in YARN Client Mode
spark-submit \ --class org.apache.spark.examples.SparkPi \ --deploy-mode client \ --master yarn \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10where version is, for example, 2.10-1.1.0-cdh5.2.0.
Running SparkPi in YARN Cluster Mode
spark-submit \ --class org.apache.spark.examples.SparkPi \ --deploy-mode cluster \ --master yarn \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10where version is, for example, 2.10-1.1.0-cdh5.2.0.
The command will continue to print out status until the job finishes, or you press control-C. Terminating the spark-submit process in cluster mode does not terminate the Spark application as it does in client mode. To monitor the status of the running application, run yarn application -list.
Optimizing YARN Mode
14/06/11 14:21:49 INFO yarn.Client: Uploading file:/home/jdoe/spark/b2.4/examples/target/scala-2.10/spark-examples-1.0.0-SNAPSHOT-hadoop2.4.0.jar to hdfs://spark-02.example.com:6000/user/jdoe/.sparkStaging/application_1402278226964_0012/spark-examples-1.0.0-SNAPSHOT-hadoop2.4.0.jar 14/06/11 14:21:50 INFO yarn.Client: Uploading file:/home/jdoe/spark/b2.4/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar to hdfs://spark-02.example.com:6000/user/jdoe/.sparkStaging/application_1402278226964_0012/spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar
hdfs dfs -mkdir -p /user/spark/share/lib hdfs dfs -put $SPARK_HOME/assembly/lib/spark-assembly_*.jar \ /user/spark/share/lib/spark-assembly.jar SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar
Building Spark Applications
- Building a single assembly JAR that includes all the dependencies, except those for Spark and Hadoop.
- Excluding any Spark and Hadoop classes from the assembly JAR, because they are already on the cluster, and part of the runtime classpath. In Maven, you can mark the Spark and Hadoop dependencies as provided.
- Always building against the same version of Spark that you are running against, to avoid compatibility issues.
For example, do not assume that applications compiled against Spark 0.9 will run on Spark 1.0 without recompiling. In addition, some applications compiled under Spark 0.9 or earlier will need changes to their source code to compile under Spark 1.0. Applications that compile under Spark 1.0 should compile under all future versions.
Using Spark with HBase
A common use case is to use Spark to process data which is destined for HBase, or which has been extracted from HBase. See Importing Data Into HBase.