Running Spark Applications

Spark applications are similar to MapReduce “jobs.” Each application is a self-contained computation which runs some user-supplied code to compute a result. As with MapReduce jobs, Spark applications can make use of the resources of multiple nodes. Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.

Each application has a driver process which coordinates its execution. This process can run in the foreground (client mode) or in the background (cluster mode). Client mode is a little simpler, but cluster mode allows you to easily log out after starting a Spark application without terminating the application. For the client mode, the input file path must point to a local file.

Spark starts executors to perform computations. There may be many executors, distributed across the cluster, depending on the size of the job. After loading some of the executors, Spark attempts to match tasks to executors.

CDH 5.3 introduces a performance optimization (via SPARK-1767), which causes Spark to prefer RDDs which are already cached locally in HDFS. This is important enough that Spark will wait for the executors near these caches to be free for a short time.

Note that Spark does not start executors on nodes with cached data, and there is no further chance to select them during the task-matching phase. This is not a problem for most workloads, since most workloads start executors on most or all nodes in the cluster. However, if you do have problems with the optimization, an alternate API, the constructor DeveloperApi, is provided for writing a Spark application, which explicitly spells out the preferred locations to start executors. See the following example, as well as examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala, for a working example of using this API.
...
val sparkConf = new SparkConf().setAppName( "SparkHdfsLR")
   val inputPath = args(0)
   val conf = SparkHadoopUtil.get.newConfiguration()
   val sc =  new SparkContext(sparkConf,
     InputFormatInfo.computePreferredLocations(
       Seq( new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
...
/**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats  for the application.
*/
@DeveloperApi
def  this(config: SparkConf, preferredNodeLocationData: Map[ String, Set[SplitInfo]]) = {
   this(config)
   this.preferredNodeLocationData = preferredNodeLocationData
}
Spark can run in two modes:
  • Standalone mode:

    In standalone mode, Spark uses a Master daemon which coordinates the efforts of the Workers, which run the executors. Standalone mode is the default, but it cannot be used on secure clusters.

  • YARN mode:

    In YARN mode, the YARN ResourceManager performs the functions of the Spark Master. The functions of the Workers are performed by the YARN NodeManager daemons, which run the executors. YARN mode is slightly more complex to set up, but it supports security, and provides better integration with YARN’s cluster-wide resource management policies.

Multiple Spark applications can run at once. If you decide to run Spark on YARN, you can decide on an application-by-application basis whether to run in YARN client mode or cluster mode. When you run Spark in client mode, the driver process runs locally; in cluster mode, it runs remotely on an ApplicationMaster.

The following sections use a sample application, SparkPi, which is packaged with Spark and computes the value of Pi, to illustrate the three modes.

Configuration

The easiest way to configure Spark is by setting $SPARK_HOME/conf/spark-defaults.conf.

This file contains lines in the form: “key value”. You can create a comment by putting a hash mark ( # ) at the beginning of a line.
Here is an example of a spark-defaults.conf file:
spark.master     spark://mysparkmaster.cloudera.com:7077
spark.eventLog.enabled    true
spark.eventLog.dir        hdfs:///user/spark/eventlog
# Set spark executor memory
spark.executor.memory     2g
spark.logConf             true
It is a good idea to put configuration keys that you want to use for every application into spark-defaults.conf. See Script for more information about configuration keys.

The Spark-Submit Script

You can start Spark applications with the spark-submit script, which is installed in your path when you install the spark-core package.

To run spark-submit, you need a compiled Spark application JAR. The following sections use a sample JAR, SparkPi, which is packaged with Spark. It computes an approximation to the value of Pi.

Running SparkPi in Standalone Mode

Supply the --master and --deploy-mode client arguments to run SparkPi in standalone mode:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--deploy-mode client \
--master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
where version is, for example, 2.10-1.1.0-cdh5.2.0.

Arguments that come after the JAR name are supplied to the application. In this case, the argument controls how good we want our approximation to Pi to be.

Running SparkPi in YARN Client Mode

In this case, the command to run SparkPi is as follows:
spark-submit  \
--class org.apache.spark.examples.SparkPi \
--deploy-mode client \    
--master yarn \    
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
where version is, for example, 2.10-1.1.0-cdh5.2.0.

Running SparkPi in YARN Cluster Mode

In this case, the command to run SparkPi is as follows:
spark-submit  \    
--class org.apache.spark.examples.SparkPi \    
--deploy-mode cluster \    
--master yarn \    
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
where version is, for example, 2.10-1.1.0-cdh5.2.0.

The command will continue to print out status until the job finishes, or you press control-C. Terminating the spark-submit process in cluster mode does not terminate the Spark application as it does in client mode. To monitor the status of the running application, run yarn application -list.

Optimizing YARN Mode

Normally, Spark copies the Spark assembly JAR file to HDFS each time you run spark-submit, as you can see in the following sample log messages:
14/06/11 14:21:49 INFO yarn.Client: Uploading
file:/home/jdoe/spark/b2.4/examples/target/scala-2.10/spark-examples-1.0.0-SNAPSHOT-hadoop2.4.0.jar to 
hdfs://spark-02.example.com:6000/user/jdoe/.sparkStaging/application_1402278226964_0012/spark-examples-1.0.0-SNAPSHOT-hadoop2.4.0.jar
14/06/11 14:21:50 INFO yarn.Client: Uploading
file:/home/jdoe/spark/b2.4/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar to
hdfs://spark-02.example.com:6000/user/jdoe/.sparkStaging/application_1402278226964_0012/spark-assembly-1.0.0-SNAPSHOT-hadoop2.4.0.jar
You can avoid doing this copy each time by manually uploading the Spark assembly JAR file to your HDFS. Then set the SPARK_JAR environment variable to this HDFS path:
hdfs dfs -mkdir -p /user/spark/share/lib 
hdfs dfs -put $SPARK_HOME/assembly/lib/spark-assembly_*.jar  \     
/user/spark/share/lib/spark-assembly.jar 
SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar

Building Spark Applications

Best practices when compiling your Spark applications include:
  • Building a single assembly JAR that includes all the dependencies, except those for Spark and Hadoop.
  • Excluding any Spark and Hadoop classes from the assembly JAR, because they are already on the cluster, and part of the runtime classpath. In Maven, you can mark the Spark and Hadoop dependencies as provided.
  • Always building against the same version of Spark that you are running against, to avoid compatibility issues.

    For example, do not assume that applications compiled against Spark 0.9 will run on Spark 1.0 without recompiling. In addition, some applications compiled under Spark 0.9 or earlier will need changes to their source code to compile under Spark 1.0. Applications that compile under Spark 1.0 should compile under all future versions.

Using Spark with HBase

A common use case is to use Spark to process data which is destined for HBase, or which has been extracted from HBase. See Importing Data Into HBase.