Apache Spark 3 integration with Schema Registry
Apache Spark 3 integrated with Schema Registry provides a library to leverage Schema Registry for managing Spark schemas and to serialize and/or de-serialize messages in Spark data sources and sinks.
Running the example programs
The examples illustrate the API usage and how
                to integrate with Schema Registry. The examples can be run from IDE (for example,
                Intellij) by specifying a master URL or by using spark3-submit. 
spark3-submit --jars /opt/cloudera/parcels/CDH/lib/spark-schema-registry-for-spark3/spark-schema-registry-for-spark3_2.12-jar-with-dependencies.jar \
--class com.hortonworks.spark.registry.examples.classname \
/opt/cloudera/parcels/CDH/lib/spark-schema-registry-for-spark3/examples/spark-schema-registry-for-spark3-examples_2.12.jar [***SCHEMA-REGISTRY-URL***] \
bootstrap-servers input-topic output-topic checkpoint-location
        Using the APIs
Typically in a Spark application you define the Spark schema for the data you are going to process:
// the schema for truck events
val schema = StructType(Seq(
  StructField("driverId", IntegerType, nullable = false),
  StructField("truckId", IntegerType, nullable = false),
  StructField("miles", LongType, nullable = false),
  StructField("eventType", StringType, nullable = false),
  ...
  ...
)
// read Json string messages from the data source
val messages = spark
      .readStream
      .format(...)
      .option(...)
      .load()  
              
// parse the messages using the above schema and do further operations
val df = messages
      .select(from_json($"value".cast("string"), schema).alias("value"))
      ...      
// project (driverId, truckId, miles) for the events where miles > 300
val filtered = df.select($"value.driverId", $"value.truckId", $"value.miles")
      .where("value.miles > 300")
        However, this approach is not practical because the schema information is tightly coupled with the code. The code needs to be changed when the schema changes, and there is no ability to share or reuse the schema between the message producers and the applications that consume the messages.
Using Schema Registry is a better solution because it enables you to manage different versions of the schema and define compatibility policies.
Configuration
import com.hortonworks.spark.registry.util._
            SchemaRegistryConfig which will be passed to the APIs. The main
                configuration parameter is the schema registry URL.
                // the schema registry client config
val config = Map[String, Object]("[***SCHEMA.REGISTRY.URL***]" -> schemaRegistryUrl)
// the schema registry config that will be implicitly passed
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
            SSL configuration
SchemaRegistryConfig expects the following SSL configuration properties:
            "schema.registry.client.ssl.protocol" -> "SSL",
"schema.registry.client.ssl.trustStoreType" -> "JKS",
"schema.registry.client.ssl.trustStorePath" -> "/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks",
"schema.registry.client.ssl.trustStorePassword" ->  "[***CHANGEMECLIENTPWD***]" 
            Fetching Spark schema by name
- sparkSchema(schemaName: String)- Returns the spark schema corresponding to the latest version of schema defined in the Schema Registry. 
- sparkSchema(schemaName: String, version: Int)- Returns the spark schema corresponding to the given version of schema defined in the Schema Registry. 
// retrieve the translated "Spark schema" by specifying the schema registry schema name
val schema = sparkSchema(name)
// parse the messages using the above schema and do further operations
val df = messages
         .select(from_json($"value".cast("string"), schema).alias("value"))
         ...
               
// project (driverId, truckId, miles) for the events where miles > 300
val filtered = df.select($"value.driverId", $"value.truckId", $"value.miles")
                  .where("value.miles > 300")
            Serializing messages using Schema Registry
- to_sr(data: Column, schemaName: String, topLevelRecordName: String, namespace: String)- Converts a Spark column data to binary format of Schema Registry. This looks up a Schema Registry schema for the - schemaNamethat matches the input and automatically registers a new schema, if not found. The- topoLevelRecordNameand- namespaceare optional and will be mapped to Avro top level record name and record namespace.
De-serializing messages using Schema Registry
- from_sr(data: Column, schemaName: String)- Converts Schema Registry binary format to Spark column, using the latest version of the schema. 
- from_sr(data: Column, schemaName: String, version: Int)- Converts Schema Registry binary format to Spark column using the given Schema Registry schema name and version. 
Serialization - deserialization example
The following is an example that uses the from_sr to de-serialize
                Schema Registry formatted messages into Spark, transforms and serializes it back to
                Schema Registry format using to_sr, and writes to a data sink. 
// Read schema registry formatted messages and deserialize to spark columns.
val df = messages
      .select(from_sr($"value", topic).alias("message"))
// project (driverId, truckId, miles) for the events where miles > 300
val filtered = df.select($"message.driverId", $"message.truckId", $"message.miles")
      .where("message.miles > 300")
// write the output as schema registry serialized bytes to a sink
// should produce events like {"driverId":14,"truckId":25,"miles":373}
val query = filtered
      .select(to_sr(struct($"*"), outSchemaName).alias("value"))
      .writeStream
      .format(..)
      .start()
            outSchemaName is
                automatically published to the Schema Registry if it does
                not exist. Building and deploying your app
<dependency>
    <groupId>com.hortonworks</groupId>
    <artifactId>spark-schema-registry-for-spark3_2.12</artifactId>
    <version>version</version>
 </dependency>
        spark3-submit using --packages:
                spark3-submit --packages com.hortonworks:spark-schema-registry-for-spark3_2.12:version \
--conf spark.jars.repositories=[***HTTPS://REPOSITORY.EXAMPLE.COM***] \
--class YourApp \
your-application-jar \
args ...
        spark3-submit --master [***MASTER URL****] \
--jars /opt/cloudera/parcels/CDH/lib/spark-schema-registry-for-spark3/spark-schema-registry-for-spark3_2.12-jar-with-dependencies.jar \
--class YourApp \
your-application-jar \
args ...
        Running in a Kerberos-enabled cluster
The library works in a Kerberos setup, where Spark and Schema Registry has been deployed on a Kerberos-enabled cluster.
To configure, set
                up the appropriate JAAS config for
                    RegistryClient (and KafkaClient, if the Spark
                data source or sink is Kafka). 
SchemaRegistryAvroExample in a Kerberos setup, follow these steps:
        - Create a keytab (for example, app.keytab) with the login user and principal you want to run the application.
- Create an app_jaas.conf file and specify the
                            keytabandprincipalcreated in Step 1.If deploying to YARN, the keytab and conf files will be distributed as YARN local resources. They will be placed in the current directory of the Spark YARN container, and the location needs to be specified as ./app.keytab. RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./app.keytab" storeKey=true useTicketCache=false principal="[***PRINCIPAL***]"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./app.keytab" storeKey=true useTicketCache=false serviceName="kafka" principal="[***PRINCIPAL***]"; };
- Provide the required ACLs for the kafka topics (in-topic, out-topic) for the principal.
- Use spark3-submitto pass the JAAS configuration file withextraJavaOptions. (And also as local resource files in YARN cluster mode.)spark3-submit --master yarn --deploy-mode cluster \ --keytab app.keytab --principal [***PRINCIPAL***] \ --files app_jaas.conf#app_jaas.conf,app.keytab#app.keytab \ --jars /opt/cloudera/parcels/CDH/lib/spark-schema-registry-for-spark3/spark-schema-registry-for-spark3_2.12-jar-with-dependencies.jar \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./app_jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./app_jaas.conf" \ --class com.hortonworks.spark.registry.examples.SchemaRegistryAvroExample \ /opt/cloudera/parcels/CDH/lib/spark-schema-registry-for-spark3/examples/spark-schema-registry-for-spark3-examples_2.12.jar \ [***SCHEMA-REGISTRY-URL***] bootstrap-server in-topic out-topic checkpoint-dir SASL_PLAINTEXT
Unsupported features
Apache Spark 3 integration with Schema Registry is not supported in
                    pyspark. 
