HBase MCC Usage in Spark with Scala

The Spark implementation does not support multiple keytabs. You must configure and enable the cross-realm kerberos for Spark to operate properly.

You do not need to distribute the keytabs or configuration files on the cluster, instead make them available on the edge node from which you start the Spark application. The principal and keytab are referenced twice. First, the credentials must be passed to start the application on the cluster and second, they must be made available to the code responsible for creating credentials for each cluster.

Spark driver writes the credentials to HDFS and later executors obtain and apply them before any code is executed. A process for both the driver and executor is started to ensure the credentials are refreshed in a timely manner.

The following example uses a constructor method for the HBase MCC configuration. It assumes that you are passing hbase-site.xml and core-site.xml into your executable main method. While using this utility, you must modify the configurations using the configuration files only. This utility does not support Spark streaming.

To start the Spark application when the primary cluster's HBase is down, use the following parameter:
--conf "spark.security.credentials.hbase.enabled=false"
You must override the HBase UserProvider class with the UserProviderMultiCluster class. If you are using the constructor method, this is set automatically.
val HBASE_CLIENT_USER_PROVIDER_CLASS = "hbase.client.userprovider.class"
mccConf.set(HBASE_CLIENT_USER_PROVIDER_CLASS, ConfigConst.HBASE_MCC_USER_PROVIDER)

Example on writing to HBase with Spark DataFrame API

Example command:
spark-submit --class com.cloudera.hbase.mcc.scala.test.SparkToHbase \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml  \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml

Example code: you can view the example code on this GitHub repository.

Breakdown of the code execution:
  • The first part of the code sets up HBase to override the connection implementation in hbase-spark along with setting up variables containing configuration information for the two clusters. A recent addition is HBASE_CLIENT_USER_PROVIDER_CLASS used in MCC to allow Spark to start jobs where an HBase cluster may be down for maintenance.
    val HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"
        val HBASE_CLIENT_USER_PROVIDER_CLASS = "hbase.client.userprovider.class"
        val CONNECTION_IMPL = "com.cloudera.hbase.mcc.ConnectionMultiCluster"
        val primaryHBaseSite = args(0)
        val primaryCoreSite = args(1)
        val failoverHBaseSite = args(2)
        val failoverCoreSite = args(3)
    
  • HBase MCC provides a base configuration that stores the configurations for the MCC applications. The first step is to create an instance of the configuration and set the override from the variables above. For spark, it is also required to set the hbase.mcc.userName and hbase.mcc.applicationId. These are used to write the delegation tokens into HDFS; in addition, it is required to set fs.defaultFS from the current Spark execution.
    val mccConf = new MultiClusterConf
        mccConf.set(HBASE_CLIENT_CONNECTION_IMPL, CONNECTION_IMPL)
        //Sets the default FS to that of the cluster submitting the spark job
        mccConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, sc.hadoopConfiguration.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY))
        mccConf.set("hbase.mmc.userName", sc.sparkUser)
        mccConf.set("hbase.mcc.applicationId", sc.applicationId)
    
  • The HBase configurations are stored independently and prefixed when loaded into the application to maintain isolation between each cluster configuration and the base MCC configuration. The following code segment shows HBase execution related parameters, which you plan to override.
    val primary = HBaseConfiguration.create()
        primary.addResource(new Path(primaryHBaseSite))
        primary.addResource(new Path(primaryCoreSite))
        primary.set("hbase.client.retries.number", "1"); //Override Default Parameters
        primary.set("hbase.client.pause", "1"); //Override Default Parameters
        primary.set("zookeeper.recovery.retry", "0"); //Override Default Parameters
    
        val failover = HBaseConfiguration.create()
        failover.addResource(new Path(failoverHBaseSite))
        failover.addResource(new Path(failoverCoreSite))
        failover.set("hbase.client.retries.number", "1"); //Override Default Parameters
        failover.set("hbase.client.pause", "1"); //Override Default Parameters
    
  • Once the configurations for each HBase cluster have been created, it is time to call the code that manages the delegation tokens for the clusters. This process returns the token name to the configuration and ensures that the executor is reading the proper tokens and applying the credentials.
    val credentialsManager = CredentialsManager.getInstance
        primary.set(ConfigConst.HBASE_MCC_TOKEN_FILE_NAME, credentialsManager.confTokenForCluster(primaryHBaseSite, primaryCoreSite, sc))
        failover.set(ConfigConst.HBASE_MCC_TOKEN_FILE_NAME, credentialsManager.confTokenForCluster(failoverHBaseSite, failoverCoreSite, sc))
    
  • Each of the final cluster configurations must be added to the MCC configuration and the following function prefixes all of the parameters for each cluster. As the hbase-spark implementation only takes a single configuration, everything is merged into an Uber configuration.
    mccConf.addClusterConfig(primary)
        mccConf.addClusterConfig(failover)
    
  • The HBaseContext from hbase-spark is created and the dataframe API is used as in any other case for Spark.
    import spark.implicits._
    
        new HBaseContext(sc, mccConf.getConfiguration())
    
        val rdd = sc.parallelize(Seq(("rowkey","SparkToHbase","0","","1")))
        val df_withcol = rdd.toDF("rowKey", "application", "batchId", "timeStamp", "loaded_events")
        logger.info("Data frame to Hbase : " + df_withcol.show())
        val hbaseTableName = "test_table"
        val hbaseTableSchema ="""rowKey STRING :key, application STRING cf1:APP, batchId STRING cf1:BID, timeStamp STRING cf1:TS, loaded_events STRING cf1:PR"""
        logger.info("Started writing to Hbase")
        df_withcol.write.format("org.apache.hadoop.hbase.spark")
          .options(Map("hbase.columns.mapping" -> hbaseTableSchema, "hbase.table" -> hbaseTableName))
          .save()
        logger.info("Completed writing to hbase")
    

Example on writing to HBase with Spark Streaming

The following example shows an application to an HDFS directory for new files. When a file is added, it counts each of the words, keeping record of the count for any previously read file and then updates HBase with the new count.

Example command:
spark-submit --class com.cloudera.hbase.mcc.scala.test.SparkStreamingExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml  \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml

Example code: you can view the example code on this GitHub repository.

In this example the configuration must be executed inside the putHBase function. That is why the variables for the token file names must be set outside of the function. If you try to use the Spark Streaming Context inside that function, it throws an error as MCC is not serializable. This does not happen in structured streaming which is shown in the following examples.

Example on reading from HDFS and writing to HBase with Spark structured streaming

In this example an application is created that will listen to an HDFS directory for new files. When a file is added, it will read the file into a micro-batch using structured streaming and push the results into HBase.

Example command:
spark-submit --class com.cloudera.hbase.mcc.scala.test.StructuredStreamingHDFSToHBase \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-memory 2g \
--executor-memory 2g \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/cluster1/hbase-site1.xml,/tmp/cluster1/core-site1.xml,/tmp/cluster2/hbase-site2.xml,/tmp/cluster2/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn \
--deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml \
/user/exampleuser/StructuredStreamingExampleSource/ /user/exampleuser/SparkStreamingExampleCheckpoint/

Example code: you can view the example code on this GitHub repository.

Example on reading from Kafka and writing to HBase with Spark structured streaming

The following example shows how to create an application that pulls messages from Kafka. When you push a set of messages to Kafka, this process creates a micro-batch using the structured streaming and push the results into HBase.

For this use case, a properties file is set up to pass into the application for the required Kafka configurations. Example properties file:
spark.kafkatohbase.target.hbase.table test_table
spark.kafkatohbase.target.hbase.cf cf1
spark.kafkatohbase.source.kafka.topic spark_2
spark.kafkatohbase.checkpoint.dir /user/exampleuser/SparkStreamingExampleCheckpoint/
spark.kafkatohbase.kafka.ssl.truststore.location ./cm-auto-global_truststore.jks
spark.kafkatohbase.kafka.bootstrap.servers yourzookeeper1:9093,yourzookeeper2:9093,yourzookeeper3:9093
spark.kafkatohbase.kafka.security.protocol SASL_SSL
spark.kafkatohbase.kafka.ssl.truststore.password trustorepasswordhere
spark.kafkatohbase.startingOffsets earliest
spark.kafkatohbase.keytab kafka.keytab
spark.kafkatohbase.principal kafka/example.server.com@EXAMPLE.COM
A jaas.conf file also need to be set.
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    debug =true
    useKeyTab=true
    storeKey=true
    keyTab="/tmp/kafka.keytab"
    useTicketCache=false
    serviceName="kafka"
    principal="kafka/example.server.com@EXAMPLE.COM";
};
Example command:
hdfs dfs -rm -r /user/cluster1tls/SparkStreamingExampleCheckpoint/
hdfs dfs -mkdir /user/cluster1tls/SparkStreamingExampleCheckpoint/
spark-submit --class com.cloudera.hbase.mcc.scala.test.StructuredStreamingExampleKafkaToHBase \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-memory 2g \
--executor-memory 2g \
--conf spark.streaming.receiver.maxRate=20 \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.initialExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=10 \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--conf "spark.security.credentials.hive.enabled=false" \
--conf "spark.security.credentials.hdfs.enabled=false" \
--driver-java-options "-Djava.security.auth.login.config=./producer_jaas2.conf -Djava.io.tmpdir=/tmp"  \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./producer_jaas2.conf -Djava.io.tmpdir=/tmp" \
--files /tmp/cluster1/hbase-site1.xml,/tmp/cluster1/core-site1.xml,/tmp/cluster2/hbase-site2.xml,/tmp/cluster2/core-site2.xml,/tmp/producer_jaas.conf#producer_jaas.conf,/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks,/tmp/kafka.keytab \
--principal exampleuser/example.server.com@EXAMPLE.COM" \
--keytab /tmp/cluster1/exampleuser.keytab \
--master yarn \
--deploy-mode cluster \
--properties-file  /tmp/properties.conf \
/tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml

Example code: you can view the example code on this GitHub repository.

Example on writing to HBase bulk put and hbase-spark

Example command:
spark-submit --class com.cloudera.hbase.mcc.scala.test.HBaseMCCBulkPutExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml  \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml

Example code: you can view the example code on this GitHub repository.

Example on writing to HBase bulk get and hbase-spark

Example command:
spark-submit --class com.cloudera.hbase.mcc.scala.test.HBaseMCCBulkGetExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml  \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml

Example code: you can view the example code on this GitHub repository.

Example on writing to HBase bulk delete and hbase-spark

Example command:
spark-submit --class com.cloudera.hbase.mcc.scala.test.HBaseMCCBulkDeleteExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml  \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml

Example code: you can view the example code on this GitHub repository.