Chapter 8. Accessing Cloud Data in Spark
Datasets stored in cloud object stores can used in Spark as if it were stored in HDFS.
All these object stores are viewed by Spark as filesystems, allowing them to be used as the source and destination of data of data: be it batch, SQL, DataFrame, or Spark Streaming. To load and save data in the cloud, Spark uses the same APIs that is used to load and save data in HDFS or other filesystems.
Provided the relevant libraries are on the classpath, a file stored in a Cloud Object Store can be referenced simply via a URL:
sparkContext.textFile("s3a://landsat-pds/scene_list.gz").count()
Similarly, an RDD can be saved to an object store via saveAsTextFile()
:
val numbers = sparkContext.parallelize(1 to 1000) // save to Amazon S3 (or compatible implementation) numbers.saveAsTextFile("s3a://bucket1/counts")
Example 1: DataFrames
DataFrames can read from and write to object stores using their read()
and
write()
methods:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StringType val spark = SparkSession .builder .appName("DataFrames") .config(sparkConf) .getOrCreate() import spark.implicits._ val numRows =1000 // generate test dataval sourceData = spark.range(0, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) // define the destination val dest ="adl://hortonworks-eu.azuredatalakestore.net/dataframes" // write the data val orcFile = dest + "/data.orc" sourceData.write.format("orc").save(orcFile) // now read it back val orcData = spark.read.format("orc").load(orcFile) // finally, write the data as Parquet orcData.write.format("parquet").save(dest + "/data.parquet") spark.stop()
Example 2: Spark Streaming and Cloud Storage
Spark Streaming can monitor files added to object stores by creating a
FileInputDStream
DStream monitoring a path under a bucket:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming._ val sparkConf =newSparkConf() val ssc = newStreamingContext(sparkConf, Milliseconds(5000)) try { val lines = ssc.textFileStream("s3a://bucket1/incoming") val matches = lines.filter(_.endsWith("3")) matches.print() ssc.start() ssc.awaitTermination() } finally { ssc.stop(true) }
Note | |
---|---|
The time to scan for new files is proportional to the number of files under the path — not the number of new files — so this can become a slow operation. |
Checkpointing streaming data to an S3 bucket is very slow, as the stream data is (potentially) recalculated, uploaded to S3, and then renamed into the checkpoint file (the rename being a slow copy operation). If S3 is used for checkpointing, the interval between checkpoints must be long enough to allow for this slow checkpoint. WASB, ADL and GCS all have faster rename operations, so not not suffer from this problem.
Related Links