Using the spark-avro Library to Access Avro Data Sources
Spark supports loading and saving DataFrames from a variety of data sources. The spark-avro library allows you to process data encoded in the Avro format using Spark. No Maven pom.xml configuration or other changes are required.
The spark-avro library supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark. The library automatically performs the schema conversion. Spark SQL reads the data and converts it to Spark's internal representation; the Avro conversion is performed only during reading and writing data.
Continue reading:
Partitioning
The spark-avro library allows you to write and read partitioned data without extra configuration. As you do when writing Parquet, simply pass the columns you want to partition by to the writer. See Writing Partitioned Data and Reading Partitioned Data.
Compression
Specify the compression used on write by setting the Spark configuration spark.sql.avro.compression.codec. The supported compression types are uncompressed, snappy, and deflate. Specify the level to use with deflate compression in spark.sql.avro.deflate.level. For an example, see Writing Deflate Compressed Records.
Avro Record Name
Specify the record name and namespace to use when writing to disk by passing recordName and recordNamespace as optional parameters. See Specifying a Record Name.
Avro to Spark SQL Conversion
The spark-avro library supports conversion for all Avro data types, except complex union types:
- boolean -> BooleanType
- int -> IntegerType
- long -> LongType
- float -> FloatType
- double -> DoubleType
- bytes -> BinaryType
- string -> StringType
- record -> StructType
- enum -> StringType
- array -> ArrayType
- map -> MapType
- fixed -> BinaryType
- union(int, long) -> LongType
- union(float, double) -> DoubleType
- union(any, null) -> any
Spark SQL to Avro Conversion
Every Spark SQL type is supported:
- BooleanType -> boolean
- IntegerType -> int
- LongType -> long
- FloatType -> float
- DoubleType -> double
- BinaryType -> bytes
- StringType -> string
- StructType -> record
- ArrayType -> array
- MapType -> map
- ByteType -> int
- ShortType -> int
- DecimalType -> string
- BinaryType -> bytes
- TimestampType -> long
Limitations
Because Spark is converting data types, keep the following in mind:
- Enumerated types are erased - Avro enumerated types become strings when they are read into Spark because Spark does not support enumerated types.
- Avro schema changes - Spark reads everything into an internal representation. Even if you just read and then write the data, the schema for the output will be different.
- Spark schema reordering - Spark reorders the elements in its schema when writing them to disk so that the elements being partitioned on are the last elements. See Writing Partitioned Data for an example.
API Examples
This section provides examples of using the spark-avro API in all supported languages.
Continue reading:
Scala Examples
The easiest way to work with Avro data files in Spark applications is by using the DataFrame API. The spark-avro library includes avro methods in SQLContext for reading and writing Avro files:
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) // The Avro records are converted to Spark types, filtered, and // then written back out as Avro records val df = sqlContext.read.avro("input dir") df.filter("age > 5").write.avro("output dir")
You can also specify the format "com.databricks.spark.avro":
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.avro").load("input dir") df.filter("age > 5").write.format("com.databricks.spark.avro").save("output dir")
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) // configuration to use deflate compression sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") sqlContext.setConf("spark.sql.avro.deflate.level", "5") val df = sqlContext.read.avro("input dir") // writes out compressed Avro records df.write.avro("output dir")
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = Seq( (2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating") df.write.partitionBy("year", "month").avro("output dir")
This code outputs a directory structure like this:
-rw-r--r-- 3 hdfs supergroup 0 2015-11-03 14:58 /tmp/output/_SUCCESS drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2011 drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2011/month=7 -rw-r--r-- 3 hdfs supergroup 229 2015-11-03 14:58 /tmp/output/year=2011/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012 drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012/month=7 -rw-r--r-- 3 hdfs supergroup 231 2015-11-03 14:58 /tmp/output/year=2012/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012/month=8 -rw-r--r-- 3 hdfs supergroup 246 2015-11-03 14:58 /tmp/output/year=2012/month=8/part-r-00000-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.avro("input dir") df.printSchema() df.filter("year = 2011").collect().foreach(println)
This code automatically detects the partitioned data and joins it all, so it is treated the same as unpartitioned data. This also queries only the directory required, to decrease disk I/O.
root |-- title: string (nullable = true) |-- rating: double (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) [Git,2.0,2011,7]
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.avro("input dir") val name = "AvroTest" val namespace = "com.cloudera.spark" val parameters = Map("recordName" -> name, "recordNamespace" -> namespace) df.write.options(parameters).avro("output dir")
Java Example
Use the DataFrame API to query Avro files in Java. This example is almost identical to Scala Example with Format:
import org.apache.spark.sql.*; SQLContext sqlContext = new SQLContext(sc); // Creates a DataFrame from a file DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load("input dir"); // Saves the subset of the Avro records read in df.filter("age > 5").write().format("com.databricks.spark.avro").save("output dir");
Python Example
Use the DataFrame API to query Avro files in Python. This example is almost identical to Scala Example with Format:
# Creates a DataFrame from a directory df = sqlContext.read.format("com.databricks.spark.avro").load("input dir") # Saves the subset of the Avro records read in df.where("age > 5").write.format("com.databricks.spark.avro").save("output dir")