Enabling fault-tolerant processing in Spark Streaming
        For long-running Spark Streaming jobs, make sure to configure the
        maximum allowed failures in a given time period. For example, to allow 3
        failures per hour, set the following parameters (in
        spark-defaults.conf or when submitting the job):
      
spark.yarn.maxAppAttempts=3
spark.yarn.am.attemptFailuresValidityInterval=1hIf the driver host for a Spark Streaming application fails, it can lose data that has been received but not yet processed. To ensure that no data is lost, you can use Spark Streaming recovery. Recovery uses a combination of a write-ahead log and checkpoints. Spark writes incoming data to HDFS as it is received and uses this data to recover state if a failure occurs.
To enable Spark Streaming recovery:
- 
          Set the spark.streaming.receiver.writeAheadLog.enableparameter totruein theSparkConfobject.
- 
          Create a StreamingContextinstance using thisSparkConf, and specify a checkpoint directory.
- 
          Use the getOrCreatemethod inStreamingContextto either create a new context or recover from an old context from the checkpoint directory:from __future__ import print_function import sys from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils checkpoint = "hdfs://ns1/user/systest/checkpoint" # Function to create and setup a new StreamingContext def functionToCreateContext(): sparkConf = SparkConf() sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf) ssc = StreamingContext(sc, 10) zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) counts.pprint() ssc.checkpoint(checkpoint) # set checkpoint directory return ssc if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) exit(-1) ssc = StreamingContext.getOrCreate(checkpoint, lambda: functionToCreateContext()) ssc.start() ssc.awaitTermination()
For more information, see Checkpointing in the Apache Spark documentation.
To prevent data loss if a receiver fails, receivers must be able to replay data from the original data sources if required.
- 
          The Kafka receiver automatically replays if the
          spark.streaming.receiver.writeAheadLog.enableparameter is set totrue.
- 
          The receiverless Direct Kafka DStream does not require the
          spark.streaming.receiver.writeAheadLog.enableparameter and can function without data loss, even without Streaming recovery.
- Both Flume receivers packaged with Spark replay the data automatically on receiver failure.
