Configuring and Running Spark (Standalone Mode)
Configuring Spark
- Edit the following portion of /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs:
### ### === IMPORTANT === ### Change the following to specify a real cluster's Master host ### export STANDALONE_SPARK_MASTER_HOST=`hostname`
Change 'hostname' in the last line to the actual hostname of the host where the Spark Master will run.
You can change other elements of the default configuration by modifying /etc/spark/conf/spark-env.sh. You can change the following:
- SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
- SPARK_WORKER_CORES, to set the number of cores to use on this machine
- SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
- SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
- SPARK_WORKER_INSTANCE, to set the number of worker processes per node
- SPARK_WORKER_DIR, to set the working directory of worker processes
Configuring the Spark History Server
$ sudo -u hdfs hadoop fs -mkdir /user/spark $ sudo -u hdfs hadoop fs -mkdir /user/spark/applicationHistory $ sudo -u hdfs hadoop fs -chown -R spark:spark /user/spark $ sudo -u hdfs hadoop fs -chmod 1777 /user/spark/applicationHistory
- Create /etc/spark/conf/spark-defaults.conf on the Spark client:
cp /etc/spark/conf/spark-defaults.conf.template /etc/spark/conf/spark-defaults.conf
- Add the following to /etc/spark/conf/spark-defaults.conf:
spark.eventLog.dir=/user/spark/applicationHistory spark.eventLog.enabled=true
In addition, if you want the YARN ResourceManager to link directly to the Spark History Server, you can set the spark.yarn.historyServer.address property in /etc/spark/conf/spark-defaults.conf:
spark.yarn.historyServer.address=http://HISTORY_HOST:HISTORY_PORT
Starting, Stopping, and Running Spark
- To start Spark, proceed as follows:
- On one node in the cluster, start the Spark Master:
$ sudo service spark-master start
- On one node in the cluster, start the Spark Master:
- On all the other nodes, start the workers:
$ sudo service spark-worker start
- On one node, start the History Server:
$ sudo service spark-history-server start
- To stop Spark, use the following commands on the appropriate hosts:
$ sudo service spark-worker stop $ sudo service spark-master stop $ sudo service spark-history-server stop
Service logs are stored in /var/log/spark.
You can use the GUI for the Spark Master at <master_host>:18080.
Testing the Spark Service
To test the Spark service, start spark-shell on one of the nodes. You can, for example, run a word count application:
val file = sc.textFile("hdfs://namenode:8020/path/to/input") val counts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://namenode:8020/output")
You can see the application by going to the Spark Master UI, by default at http://spark-master:18080, to see the Spark Shell application, its executors and logs.
Running Spark Applications
For details on running Spark applications in the YARN Client and Cluster modes, see Running Spark Applications.Enabling Fault-Tolerant Processing in Spark Streaming
If the driver node for a Spark Streaming application fails, it can lose data that has been received, but not yet processed. To ensure that no data is lost, Spark can write out incoming data to HDFS as it is received and use this data to recover state in the event of a failure. This feature, called Spark Streaming recovery, is introduced in CDH 5.3 as a Beta feature. Spark Streaming recovery is not supported for production use in CDH 5.3.
- To enable Spark Streaming recovery, set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the
SparkConf object used to instantiate the StreamingContext
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true ")
- Next, create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
- Finally use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from
the checkpoint directory. The following example shows steps 2 and 3 of this procedure.
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val conf = new SparkConf() sparkConf.set( "spark.streaming.receiver.writeAheadLog.enable", "true ") val ssc = new StreamingContext(sparkConf,...) // new context val kafkaStream = KafkaUtils.createStream(...) // Do some transformations on the stream....and write it out etc. ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
To prevent data loss if a receiver fails, the receivers used must be able to replay data from the original data sources if required. The Kafka receiver will automatically replay if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true. Both the Flume receivers that come packaged with Spark also replay the data automatically on receiver failure.