Apache Spark 3 integration with Schema Registry
Apache Spark 3 integrated with Schema Registry provides a library to leverage Schema Registry for managing Spark schemas and to serialize and/or de-serialize messages in Spark data sources and sinks.
Running the example programs
The examples illustrate the API usage and how
to integrate with Schema Registry. The examples can be run from IDE (for example,
Intellij) by specifying a master URL or by using spark3-submit
.
spark3-submit --jars /opt/cloudera/parcels/SPARK3/lib/spark3/spark-schema-registry-for-spark3/spark-schema-registry-for-spark3_2.12-jar-with-dependencies.jar \
--class com.hortonworks.spark.registry.examples.classname \
/opt/cloudera/parcels/SPARK3/lib/spark3/spark-schema-registry-for-spark3/examples/spark-schema-registry-for-spark3-examples_2.12.jar [***SCHEMA-REGISTRY-URL***] \
bootstrap-servers input-topic output-topic checkpoint-location
Using the APIs
Typically in a Spark application you define the Spark schema for the data you are going to process:
// the schema for truck events
val schema = StructType(Seq(
StructField("driverId", IntegerType, nullable = false),
StructField("truckId", IntegerType, nullable = false),
StructField("miles", LongType, nullable = false),
StructField("eventType", StringType, nullable = false),
...
...
)
// read Json string messages from the data source
val messages = spark
.readStream
.format(...)
.option(...)
.load()
// parse the messages using the above schema and do further operations
val df = messages
.select(from_json($"value".cast("string"), schema).alias("value"))
...
// project (driverId, truckId, miles) for the events where miles > 300
val filtered = df.select($"value.driverId", $"value.truckId", $"value.miles")
.where("value.miles > 300")
However, this approach is not practical because the schema information is tightly coupled with the code. The code needs to be changed when the schema changes, and there is no ability to share or reuse the schema between the message producers and the applications that consume the messages.
Using Schema Registry is a better solution because it enables you to manage different versions of the schema and define compatibility policies.
Configuration
import com.hortonworks.spark.registry.util._
SchemaRegistryConfig
which will be passed to the APIs. The main
configuration parameter is the schema registry URL.
// the schema registry client config
val config = Map[String, Object]("[***SCHEMA.REGISTRY.URL***]" -> schemaRegistryUrl)
// the schema registry config that will be implicitly passed
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
SSL configuration
SchemaRegistryConfig
expects the following SSL configuration properties:
"schema.registry.client.ssl.protocol" -> "SSL",
"schema.registry.client.ssl.trustStoreType" -> "JKS",
"schema.registry.client.ssl.trustStorePath" -> "/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks",
"schema.registry.client.ssl.trustStorePassword" -> "[***CHANGEMECLIENTPWD***]"
Fetching Spark schema by name
sparkSchema(schemaName: String)
Returns the spark schema corresponding to the latest version of schema defined in the Schema Registry.
sparkSchema(schemaName: String, version: Int)
Returns the spark schema corresponding to the given version of schema defined in the Schema Registry.
// retrieve the translated "Spark schema" by specifying the schema registry schema name
val schema = sparkSchema(name)
// parse the messages using the above schema and do further operations
val df = messages
.select(from_json($"value".cast("string"), schema).alias("value"))
...
// project (driverId, truckId, miles) for the events where miles > 300
val filtered = df.select($"value.driverId", $"value.truckId", $"value.miles")
.where("value.miles > 300")
Serializing messages using Schema Registry
to_sr(data: Column, schemaName: String, topLevelRecordName: String, namespace: String)
Converts a Spark column data to binary format of Schema Registry. This looks up a Schema Registry schema for the
schemaName
that matches the input and automatically registers a new schema, if not found. ThetopoLevelRecordName
andnamespace
are optional and will be mapped to Avro top level record name and record namespace.
De-serializing messages using Schema Registry
from_sr(data: Column, schemaName: String)
Converts Schema Registry binary format to Spark column, using the latest version of the schema.
from_sr(data: Column, schemaName: String, version: Int)
Converts Schema Registry binary format to Spark column using the given Schema Registry schema name and version.
Serialization - deserialization example
The following is an example that uses the from_sr
to de-serialize
Schema Registry formatted messages into Spark, transforms and serializes it back to
Schema Registry format using to_sr
, and writes to a data sink.
// Read schema registry formatted messages and deserialize to spark columns.
val df = messages
.select(from_sr($"value", topic).alias("message"))
// project (driverId, truckId, miles) for the events where miles > 300
val filtered = df.select($"message.driverId", $"message.truckId", $"message.miles")
.where("message.miles > 300")
// write the output as schema registry serialized bytes to a sink
// should produce events like {"driverId":14,"truckId":25,"miles":373}
val query = filtered
.select(to_sr(struct($"*"), outSchemaName).alias("value"))
.writeStream
.format(..)
.start()
The output schema outSchemaName
is
automatically published to the Schema Registry if it does
not exist. Building and deploying your app
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>spark-schema-registry-for-spark3_2.12</artifactId>
<version>version</version>
</dependency>
Once the application JAR file is built, deploy it by adding the
dependency in spark3-submit
using --packages
:
spark3-submit --packages com.hortonworks:spark-schema-registry-for-spark3_2.12:version \
--conf spark.jars.repositories=[***HTTPS://REPOSITORY.EXAMPLE.COM***] \
--class YourApp \
your-application-jar \
args ...
Make sure the package is published in a local or online
available repository.spark3-submit --master [***MASTER URL****] \
--jars /opt/cloudera/parcels/SPARK3/lib/spark3/spark-schema-registry-for-spark3/spark-schema-registry-for-spark3_2.12-jar-with-dependencies.jar \
--class YourApp \
your-application-jar \
args ...
Running in a Kerberos-enabled cluster
The library works in a Kerberos setup, where Spark and Schema Registry has been deployed on a Kerberos-enabled cluster.
To configure, set
up the appropriate JAAS
config for
RegistryClient
(and KafkaClient
, if the Spark
data source or sink is Kafka).
SchemaRegistryAvroExample
in a Kerberos setup, follow these steps:
- Create a keytab (for example, app.keytab) with the login user and principal you want to run the application.
- Create an app_jaas.conf file and specify the
keytab
andprincipal
created in Step 1.If deploying to YARN, the keytab and conf files will be distributed as YARN local resources. They will be placed in the current directory of the Spark YARN container, and the location needs to be specified as ./app.keytab.
RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./app.keytab" storeKey=true useTicketCache=false principal="[***PRINCIPAL***]"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./app.keytab" storeKey=true useTicketCache=false serviceName="kafka" principal="[***PRINCIPAL***]"; };
- Provide the required ACLs for the kafka topics (in-topic, out-topic) for the principal.
- Use
spark3-submit
to pass the JAAS configuration file withextraJavaOptions
. (And also as local resource files in YARN cluster mode.)spark3-submit --master yarn --deploy-mode cluster \ --keytab app.keytab --principal [***PRINCIPAL***] \ --files app_jaas.conf#app_jaas.conf,app.keytab#app.keytab \ --jars /opt/cloudera/parcels/SPARK3/lib/spark3/spark-schema-registry-for-spark3/spark-schema-registry-for-spark3_2.12-jar-with-dependencies.jar \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./app_jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./app_jaas.conf" \ --class com.hortonworks.spark.registry.examples.SchemaRegistryAvroExample \ /opt/cloudera/parcels/SPARK3/lib/spark3/spark-schema-registry-for-spark3/examples/spark-schema-registry-for-spark3-examples_2.12.jar \ [***SCHEMA-REGISTRY-URL***] bootstrap-server in-topic out-topic checkpoint-dir SASL_PLAINTEXT
Unsupported features
Apache Spark 3 integration with Schema Registry is not supported in
pyspark
.