Configuring and Running Spark (Standalone Mode)

Configuring Spark

Before you can run Spark in standalone mode, you must do the following on every host in the cluster:
  • Edit the following portion of /etc/spark/conf/ to point to the host where the Spark Master runs:
    ### === IMPORTANT ===
    ### Change the following to specify a real cluster's Master host
    export STANDALONE_SPARK_MASTER_HOST=`hostname`
    Change 'hostname' in the last line to the actual hostname of the host where the Spark Master will run.

You can change other elements of the default configuration by modifying /etc/spark/conf/ You can change the following:

  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Configuring the Spark History Server

Before you can run the Spark History Server, you must create the /user/spark/applicationHistory/ directory in HDFS and set ownership and permissions as follows:
$ sudo -u hdfs hadoop fs -mkdir /user/spark 
$ sudo -u hdfs hadoop fs -mkdir /user/spark/applicationHistory 
$ sudo -u hdfs hadoop fs -chown -R spark:spark /user/spark
$ sudo -u hdfs hadoop fs -chmod 1777 /user/spark/applicationHistory
On Spark clients (systems from which you intend to launch Spark jobs), do the following:
  1. Create /etc/spark/conf/spark-defaults.conf on the Spark client:
    cp /etc/spark/conf/spark-defaults.conf.template /etc/spark/conf/spark-defaults.conf
  2. Add the following to /etc/spark/conf/spark-defaults.conf:
This causes Spark applications running on this client to write their history to the directory that the history server reads.

In addition, if you want the YARN ResourceManager to link directly to the Spark History Server, you can set the spark.yarn.historyServer.address property in /etc/spark/conf/spark-defaults.conf:


Starting, Stopping, and Running Spark

  • To start Spark, proceed as follows:
    • On one node in the cluster, start the Spark Master:
      $ sudo service spark-master start
  • On all the other nodes, start the workers:
    $ sudo service spark-worker start
  • On one node, start the History Server:
    $ sudo service spark-history-server start
  • To stop Spark, use the following commands on the appropriate hosts:
    $ sudo service spark-worker stop
    $ sudo service spark-master stop
    $ sudo service spark-history-server stop

Service logs are stored in /var/log/spark.

You can use the GUI for the Spark Master at <master_host>:18080.

Testing the Spark Service

To test the Spark service, start spark-shell on one of the nodes. You can, for example, run a word count application:

val file = sc.textFile("hdfs://namenode:8020/path/to/input")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)

You can see the application by going to the Spark Master UI, by default at http://spark-master:18080, to see the Spark Shell application, its executors and logs.

Running Spark Applications

For details on running Spark applications in the YARN Client and Cluster modes, see Running Spark Applications.

Enabling Fault-Tolerant Processing in Spark Streaming

If the driver node for a Spark Streaming application fails, it can lose data that has been received, but not yet processed. To ensure that no data is lost, Spark can write out incoming data to HDFS as it is received and use this data to recover state in the event of a failure. This feature, called Spark Streaming recovery, is introduced in CDH 5.3 as a Beta feature. Spark Streaming recovery is not supported for production use in CDH 5.3.

  1. To enable Spark Streaming recovery, set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the SparkConf object used to instantiate the StreamingContext
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable",  "true ")
  2. Next, create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
  3. Finally use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from the checkpoint directory. The following example shows steps 2 and 3 of this procedure.
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
        val conf =  new SparkConf()
        sparkConf.set( "spark.streaming.receiver.writeAheadLog.enable",  "true ")
        val ssc =  new StreamingContext(sparkConf,...)   // new  context
        val kafkaStream = KafkaUtils.createStream(...)
        // Do some transformations on the stream....and write it out etc.
        ssc.checkpoint(checkpointDirectory)    // set checkpoint directory
     // Get StreamingContext from checkpoint data or create a new  one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

To prevent data loss if a receiver fails, the receivers used must be able to replay data from the original data sources if required. The Kafka receiver will automatically replay if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true. Both the Flume receivers that come packaged with Spark also replay the data automatically on receiver failure.