Using the spark-avro Library to Access Avro Data Sources

Spark supports loading and saving DataFrames from a variety of data sources. The spark-avro library allows you to process data encoded in the Avro format using Spark. No Maven pom.xml configuration or other changes are required.

The spark-avro library supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark. The library automatically performs the schema conversion. Spark SQL reads the data and converts it to Spark's internal representation; the Avro conversion is performed only during reading and writing data.

Partitioning

The spark-avro library allows you to write and read partitioned data without extra configuration. As you do when writing Parquet, simply pass the columns you want to partition by to the writer. See Writing Partitioned Data and Reading Partitioned Data.

Compression

Specify the compression used on write by setting the Spark configuration spark.sql.avro.compression.codec. The supported compression types are uncompressed, snappy, and deflate. Specify the level to use with deflate compression in spark.sql.avro.deflate.level. For an example, see Writing Deflate Compressed Records.

Avro Record Name

Specify the record name and namespace to use when writing to disk by passing recordName and recordNamespace as optional parameters. See Specifying a Record Name.

Avro to Spark SQL Conversion

The spark-avro library supports conversion for all Avro data types, except complex union types:

  • boolean -> BooleanType
  • int -> IntegerType
  • long -> LongType
  • float -> FloatType
  • double -> DoubleType
  • bytes -> BinaryType
  • string -> StringType
  • record -> StructType
  • enum -> StringType
  • array -> ArrayType
  • map -> MapType
  • fixed -> BinaryType
The spark-avro library supports the following union types:
  • union(int, long) -> LongType
  • union(float, double) -> DoubleType
  • union(any, null) -> any
All doc, aliases, and other fields are stripped when they are loaded into Spark.

Spark SQL to Avro Conversion

Every Spark SQL type is supported:

  • BooleanType -> boolean
  • IntegerType -> int
  • LongType -> long
  • FloatType -> float
  • DoubleType -> double
  • BinaryType -> bytes
  • StringType -> string
  • StructType -> record
  • ArrayType -> array
  • MapType -> map
  • ByteType -> int
  • ShortType -> int
  • DecimalType -> string
  • BinaryType -> bytes
  • TimestampType -> long

Limitations

Because Spark is converting data types, keep the following in mind:

  • Enumerated types are erased - Avro enumerated types become strings when they are read into Spark because Spark does not support enumerated types.
  • Avro schema changes - Spark reads everything into an internal representation. Even if you just read and then write the data, the schema for the output will be different.
  • Spark schema reordering - Spark reorders the elements in its schema when writing them to disk so that the elements being partitioned on are the last elements. See Writing Partitioned Data for an example.

API Examples

This section provides examples of using the spark-avro API in all supported languages.

Scala Examples

The easiest way to work with Avro data files in Spark applications is by using the DataFrame API. The spark-avro library includes avro methods in SQLContext for reading and writing Avro files:

Scala Example with Function
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read.avro("input dir")
df.filter("age > 5").write.avro("output dir")

You can also specify the format "com.databricks.spark.avro":

Scala Example with Format
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.format("com.databricks.spark.avro").load("input dir")

df.filter("age > 5").write.format("com.databricks.spark.avro").save("output dir")
Writing Deflate Compressed Records
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// configuration to use deflate compression
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "5")

val df = sqlContext.read.avro("input dir")

// writes out compressed Avro records
df.write.avro("output dir")
Writing Partitioned Data
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("output dir")

This code outputs a directory structure like this:

-rw-r--r--   3 hdfs supergroup          0 2015-11-03 14:58 /tmp/output/_SUCCESS
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2011
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2011/month=7
-rw-r--r--   3 hdfs supergroup        229 2015-11-03 14:58 /tmp/output/year=2011/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012/month=7
-rw-r--r--   3 hdfs supergroup        231 2015-11-03 14:58 /tmp/output/year=2012/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012/month=8
-rw-r--r--   3 hdfs supergroup        246 2015-11-03 14:58 /tmp/output/year=2012/month=8/part-r-00000-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
Reading Partitioned Data
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("input dir")

df.printSchema()
df.filter("year = 2011").collect().foreach(println)

This code automatically detects the partitioned data and joins it all, so it is treated the same as unpartitioned data. This also queries only the directory required, to decrease disk I/O.

root
|-- title: string (nullable = true)
|-- rating: double (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)

[Git,2.0,2011,7]
Specifying a Record Name
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("input dir")

val name = "AvroTest"
val namespace = "com.cloudera.spark"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("output dir")

Java Example

Use the DataFrame API to query Avro files in Java. This example is almost identical to Scala Example with Format:

import org.apache.spark.sql.*;

SQLContext sqlContext = new SQLContext(sc);

// Creates a DataFrame from a file
DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load("input dir");

// Saves the subset of the Avro records read in
df.filter("age > 5").write().format("com.databricks.spark.avro").save("output dir");

Python Example

Use the DataFrame API to query Avro files in Python. This example is almost identical to Scala Example with Format:

# Creates a DataFrame from a directory
df = sqlContext.read.format("com.databricks.spark.avro").load("input dir")

#  Saves the subset of the Avro records read in
df.where("age > 5").write.format("com.databricks.spark.avro").save("output dir")

Spark SQL Example

You can also write SQL queries against the DataFrames directly:
CREATE TEMPORARY TABLE table_name USING com.databricks.spark.avro OPTIONS (path "input dir")