Enabling fault-tolerant processing in Spark Streaming

For long-running Spark Streaming jobs, make sure to configure the maximum allowed failures in a given time period. For example, to allow 3 failures per hour, set the following parameters (in spark-defaults.conf or when submitting the job):

spark.yarn.maxAppAttempts=3
spark.yarn.am.attemptFailuresValidityInterval=1h

If the driver host for a Spark Streaming application fails, it can lose data that has been received but not yet processed. To ensure that no data is lost, you can use Spark Streaming recovery. Recovery uses a combination of a write-ahead log and checkpoints. Spark writes incoming data to HDFS as it is received and uses this data to recover state if a failure occurs.

To enable Spark Streaming recovery:

  1. Set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the SparkConf object.
  2. Create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
  3. Use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from the checkpoint directory:
    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext, SparkConf
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    checkpoint = "hdfs://ns1/user/systest/checkpoint"
    
    # Function to create and setup a new StreamingContext
    def functionToCreateContext():
    
      sparkConf = SparkConf()
      sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
      sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf)
      ssc = StreamingContext(sc, 10)
    
      zkQuorum, topic = sys.argv[1:]
      kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
      lines = kvs.map(lambda x: x[1])
      counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
      counts.pprint()
    
      ssc.checkpoint(checkpoint)   # set checkpoint directory
      return ssc
    
    if __name__ == "__main__":
      if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    
      ssc = StreamingContext.getOrCreate(checkpoint, lambda: functionToCreateContext())
      ssc.start()
      ssc.awaitTermination()

For more information, see Checkpointing in the Apache Spark documentation.

To prevent data loss if a receiver fails, receivers must be able to replay data from the original data sources if required.

  • The Kafka receiver automatically replays if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true.
  • The receiverless Direct Kafka DStream does not require the spark.streaming.receiver.writeAheadLog.enable parameter and can function without data loss, even without Streaming recovery.
  • Both Flume receivers packaged with Spark replay the data automatically on receiver failure.