Spark Applications

Spark applications are similar to MapReduce jobs. Each application is a self-contained computation that runs user-supplied code to compute a result. As with MapReduce jobs, Spark applications can make use of the resources of multiple hosts.

In MapReduce, the highest-level unit of computation is a job. The system loads the data, applies a map function, shuffles it, applies a reduce function, and writes it back out to persistent storage. In Spark, the highest-level of computation is an application, which can run multiple jobs, in sequence or in parallel. A Spark job can consist of more stages than just a single map and reduce.

To manage the job flow and schedule tasks, Spark relies on a driver process. Typically, this driver process is the same as the client process used to initiate the job, although in YARN mode, the driver can run on the cluster. In contrast, in MapReduce, the client process can terminate and the job will continue running.

A Spark application corresponds to an instance of the SparkContext class. An application can be used for a single batch job, an interactive session with multiple jobs spaced apart, or a long-lived server continually satisfying requests. Unlike MapReduce, an application will have processes, called executors, running on its behalf even when it's not running any jobs. This approach enables data storage in memory for quick access, as well as lightning-fast task startup time.

Spark Terminology

resilient distributed dataset (RDD)
The core programming abstraction in Spark, consisting of a fault-tolerant collection of elements that can be operated on in parallel.
partition
A subset of the elements in an RDD. Partitions define the unit of parallelism; Spark processes elements within a partition in sequence and multiple partitions in parallel. When Spark reads a file from HDFS, it creates a single partition for a single input split. It returns a single partition for a single block of HDFS (but the split between partitions is on line split, not the block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).
application
A job, sequence of jobs, or a long-running service issuing new commands as needed or an interactive exploration session.
application JAR
A JAR containing a Spark application. In some cases you can use an "Uber" JAR containing your application along with its dependencies. The JAR should never include Hadoop or Spark libraries, however, these will be added at runtime.
cluster manager
An external service for acquiring resources on the cluster: Spark Standalone or YARN.
job
A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action.
task
A unit of work on a partition of a distributed dataset. Also referred to as a stage.
driver
Process that represents the application session. The driver is responsible for converting the application to a directed graph of individual steps to execute on the cluster. There is one driver per application.
executor
A process that serves a Spark application. An executor runs multiple tasks over its lifetime, and multiple tasks concurrently. A host may have several Spark executors and there are many hosts running Spark executors for each application.
deploy mode
Identifies where the driver process runs. In client mode, the submitter launches the driver outside of the cluster. In cluster mode, the framework launches the driver inside the cluster. Client mode is simpler, but cluster mode allows you to log out after starting a Spark application without terminating the application.
Spark Standalone
A model of running Spark applications in which a Master daemon coordinates the efforts of Worker daemons, which run the executors.
Spark on YARN
A model of running Spark applications in which the YARN ResourceManager performs the functions of the Spark Master. The functions of the Workers are performed by the YARN NodeManagers, which run the executors.
ApplicationMaster
A YARN role responsible for negotiating resource requests made by the driver and finding a set of containers in which to run the Spark application. There is one ApplicationMaster per application.

Configuring Spark Applications

You can specify application configuration parameters as follows:
  • Pass them directly to the SparkConf that is used to create the SparkContext in your Spark application; for example:
    val conf = new SparkConf().set("spark.dynamicAllocation.initialExecutors", "5")
    val sc = new SparkContext(conf)
  • To avoid hard-coding them in the application:
    • Pass them using the --conf command-line switch; for example:
      spark-submit \
          --class com.cloudera.example.YarnExample \
          --master yarn \
          --deploy-mode cluster \
          --conf "spark.eventLog.dir=hdfs:///user/spark/eventlog" \
          lib/yarn-example.jar \
          10
    • Specify them in spark-defaults.conf. This file contains lines in the form: "property value". You can create a comment by putting a hash mark ( # ) at the beginning of a line. You cannot add comments to the end or middle of a line.
      Here is an example of a spark-defaults.conf file:
      spark.master     spark://mysparkmaster.acme.com:7077
      spark.eventLog.enabled    true
      spark.eventLog.dir        hdfs:///user/spark/eventlog
      # Set spark executor memory
      spark.executor.memory     2g
      spark.logConf             true
      It is a good idea to put configuration properties that you want to use for every application into spark-defaults.conf. See Application Properties for more information.
The order of precedence in configuration parameters is:
  1. Parameters passed to SparkConf.
  2. Arguments passed to spark-submit, spark-shell, or pyspark.
  3. Properties set in spark-defaults.conf.

For more information, see Running Spark on YARN and Spark Configuration.

Specifying Properties in spark-defaults.conf Using Cloudera Manager

Minimum Required Role: Configurator (also provided by Cluster Administrator, Full Administrator)

  1. Go to the Spark service.
  2. Click the Configuration tab.
  3. Select Scope > Spark Service_Name Service-Wide.
  4. Select Category > Advanced.
  5. Locate the Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-defaults.conf property.
  6. Specify properties described in Application Properties.

    If more than one role group applies to this configuration, edit the value for the appropriate role group. See Modifying Configuration Properties.

  7. Click Save Changes to commit the changes.
  8. Deploy the client configuration.

Specifying Properties in spark-defaults.conf Using the Command Line

To specify properties in spark-defaults.conf using the command line, edit the file SPARK_HOME/conf/spark-defaults.conf.

Building Spark Applications

Best practices when building Spark applications include:
  • Compiling against the same version of Spark that you are running against, to avoid compatibility issues. For example, do not assume that applications compiled against Spark 0.9 will run on Spark 1.0 without recompiling. In addition, some applications compiled under Spark 0.9 or earlier will need changes to their source code to compile under Spark 1.0. Applications that compile under Spark 1.0 should compile under all future versions. "Uber" JARs must always be built against the version of Spark you intend to run (see Apache Spark Known Issues).
  • Building a single assembly JAR that includes all the dependencies. Exclude Spark and Hadoop classes from the assembly JAR, because they are already on the cluster, and part of the runtime classpath. In Maven, you can mark the Spark and Hadoop dependencies as provided.

Enabling Fault-Tolerant Processing in Spark Streaming

If the driver host for a Spark Streaming application fails, it can lose data that has been received, but not yet processed. To ensure that no data is lost, Spark can write out incoming data to HDFS as it is received and use this data to recover state in the event of a failure. This feature, called Spark Streaming recovery, was introduced in CDH 5.3 as a Beta feature. Spark Streaming recovery is supported for production use in CDH 5.4.

To enable Spark Streaming recovery:
  1. Set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the SparkConf object:
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
  2. Create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
  3. Use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from the checkpoint directory:
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
        val conf =  new SparkConf()
        sparkConf.set( "spark.streaming.receiver.writeAheadLog.enable", "true ")
        val ssc =  new StreamingContext(sparkConf,...)   // new  context
    
        val kafkaStream = KafkaUtils.createStream(...)
        // Do some transformations on the stream....and write it out etc.
    
        ssc.checkpoint(checkpointDirectory)    // set checkpoint directory
        ssc
    }
    
     // Get StreamingContext from checkpoint data or create a new  one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

For further information, see Checkpointing.

To prevent data loss if a receiver fails, receivers must be able to replay data from the original data sources if required.
  • The Kafka receiver automatically replays if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true.
  • The receiver-less Direct Kafka DStream does not require the spark.streaming.receiver.writeAheadLog.enable parameter, and can function without data loss even without Streaming recovery.
  • Both the Flume receivers that come packaged with Spark replay the data automatically on receiver failure.
See Spark Streaming + Kafka Integration Guide and Spark Streaming + Flume Integration Guide.

Using Spark with HBase

A common use case is to use Spark to process data that is destined for HBase. See Importing Data Into HBase.