Enabling fault-tolerant processing in Spark Streaming
For long-running Spark Streaming jobs, make sure to configure the
maximum allowed failures in a given time period. For example, to allow 3
failures per hour, set the following parameters (in
spark-defaults.conf
or when submitting the job):
spark.yarn.maxAppAttempts=3
spark.yarn.am.attemptFailuresValidityInterval=1h
If the driver host for a Spark Streaming application fails, it can lose data that has been received but not yet processed. To ensure that no data is lost, you can use Spark Streaming recovery. Recovery uses a combination of a write-ahead log and checkpoints. Spark writes incoming data to HDFS as it is received and uses this data to recover state if a failure occurs.
To enable Spark Streaming recovery:
-
Set the
spark.streaming.receiver.writeAheadLog.enable
parameter totrue
in theSparkConf
object. -
Create a
StreamingContext
instance using thisSparkConf
, and specify a checkpoint directory. -
Use the
getOrCreate
method inStreamingContext
to either create a new context or recover from an old context from the checkpoint directory:from __future__ import print_function import sys from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils checkpoint = "hdfs://ns1/user/systest/checkpoint" # Function to create and setup a new StreamingContext def functionToCreateContext(): sparkConf = SparkConf() sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf) ssc = StreamingContext(sc, 10) zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) counts.pprint() ssc.checkpoint(checkpoint) # set checkpoint directory return ssc if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) exit(-1) ssc = StreamingContext.getOrCreate(checkpoint, lambda: functionToCreateContext()) ssc.start() ssc.awaitTermination()
For more information, see Checkpointing in the Apache Spark documentation.
To prevent data loss if a receiver fails, receivers must be able to replay data from the original data sources if required.
-
The Kafka receiver automatically replays if the
spark.streaming.receiver.writeAheadLog.enable
parameter is set totrue
. -
The receiverless Direct Kafka DStream does not require the
spark.streaming.receiver.writeAheadLog.enable
parameter and can function without data loss, even without Streaming recovery. - Both Flume receivers packaged with Spark replay the data automatically on receiver failure.