HBase MCC Usage in Spark with Scala
The Spark implementation does not support multiple keytabs. You must configure and enable the cross-realm kerberos for Spark to operate properly.
You do not need to distribute the keytabs or configuration files on the cluster, instead make them available on the edge node from which you start the Spark application. The principal and keytab are referenced twice. First, the credentials must be passed to start the application on the cluster and second, they must be made available to the code responsible for creating credentials for each cluster.
Spark driver writes the credentials to HDFS and later executors obtain and apply them before any code is executed. A process for both the driver and executor is started to ensure the credentials are refreshed in a timely manner.
The following example uses a constructor method for the HBase MCC configuration. It assumes that you are passing hbase-site.xml and core-site.xml into your executable main method. While using this utility, you must modify the configurations using the configuration files only. This utility does not support Spark streaming.
--conf "spark.security.credentials.hbase.enabled=false"
val HBASE_CLIENT_USER_PROVIDER_CLASS = "hbase.client.userprovider.class"
mccConf.set(HBASE_CLIENT_USER_PROVIDER_CLASS, ConfigConst.HBASE_MCC_USER_PROVIDER)
Example on writing to HBase with Spark DataFrame API
spark-submit --class com.cloudera.hbase.mcc.scala.test.SparkToHbase \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml
Example code: you can view the example code on this GitHub repository.
- The first part of the code sets up HBase to override the connection implementation in
hbase-spark along with setting up variables containing configuration information for the two
clusters. A recent addition is HBASE_CLIENT_USER_PROVIDER_CLASS used in MCC
to allow Spark to start jobs where an HBase cluster may be down for
maintenance.
val HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl" val HBASE_CLIENT_USER_PROVIDER_CLASS = "hbase.client.userprovider.class" val CONNECTION_IMPL = "com.cloudera.hbase.mcc.ConnectionMultiCluster" val primaryHBaseSite = args(0) val primaryCoreSite = args(1) val failoverHBaseSite = args(2) val failoverCoreSite = args(3)
- HBase MCC provides a base configuration that stores the configurations for the MCC
applications. The first step is to create an instance of the configuration and set the
override from the variables above. For spark, it is also required to set the
hbase.mcc.userName and hbase.mcc.applicationId.
These are used to write the delegation tokens into HDFS; in addition, it is required to
set fs.defaultFS from the current Spark
execution.
val mccConf = new MultiClusterConf mccConf.set(HBASE_CLIENT_CONNECTION_IMPL, CONNECTION_IMPL) //Sets the default FS to that of the cluster submitting the spark job mccConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, sc.hadoopConfiguration.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)) mccConf.set("hbase.mmc.userName", sc.sparkUser) mccConf.set("hbase.mcc.applicationId", sc.applicationId)
- The HBase configurations are stored independently and prefixed when loaded into the
application to maintain isolation between each cluster configuration and the base MCC
configuration. The following code segment shows HBase execution related parameters,
which you plan to
override.
val primary = HBaseConfiguration.create() primary.addResource(new Path(primaryHBaseSite)) primary.addResource(new Path(primaryCoreSite)) primary.set("hbase.client.retries.number", "1"); //Override Default Parameters primary.set("hbase.client.pause", "1"); //Override Default Parameters primary.set("zookeeper.recovery.retry", "0"); //Override Default Parameters val failover = HBaseConfiguration.create() failover.addResource(new Path(failoverHBaseSite)) failover.addResource(new Path(failoverCoreSite)) failover.set("hbase.client.retries.number", "1"); //Override Default Parameters failover.set("hbase.client.pause", "1"); //Override Default Parameters
- Once the configurations for each HBase cluster have been created, it is time to call
the code that manages the delegation tokens for the clusters. This process returns the
token name to the configuration and ensures that the executor is reading the proper
tokens and applying the
credentials.
val credentialsManager = CredentialsManager.getInstance primary.set(ConfigConst.HBASE_MCC_TOKEN_FILE_NAME, credentialsManager.confTokenForCluster(primaryHBaseSite, primaryCoreSite, sc)) failover.set(ConfigConst.HBASE_MCC_TOKEN_FILE_NAME, credentialsManager.confTokenForCluster(failoverHBaseSite, failoverCoreSite, sc))
- Each of the final cluster configurations must be added to the MCC configuration and
the following function prefixes all of the parameters for each cluster. As the
hbase-spark implementation only takes a single configuration, everything is merged into
an Uber
configuration.
mccConf.addClusterConfig(primary) mccConf.addClusterConfig(failover)
- The HBaseContext from hbase-spark is created and the dataframe API
is used as in any other case for
Spark.
import spark.implicits._ new HBaseContext(sc, mccConf.getConfiguration()) val rdd = sc.parallelize(Seq(("rowkey","SparkToHbase","0","","1"))) val df_withcol = rdd.toDF("rowKey", "application", "batchId", "timeStamp", "loaded_events") logger.info("Data frame to Hbase : " + df_withcol.show()) val hbaseTableName = "test_table" val hbaseTableSchema ="""rowKey STRING :key, application STRING cf1:APP, batchId STRING cf1:BID, timeStamp STRING cf1:TS, loaded_events STRING cf1:PR""" logger.info("Started writing to Hbase") df_withcol.write.format("org.apache.hadoop.hbase.spark") .options(Map("hbase.columns.mapping" -> hbaseTableSchema, "hbase.table" -> hbaseTableName)) .save() logger.info("Completed writing to hbase")
Example on writing to HBase with Spark Streaming
The following example shows an application to an HDFS directory for new files. When a file is added, it counts each of the words, keeping record of the count for any previously read file and then updates HBase with the new count.
spark-submit --class com.cloudera.hbase.mcc.scala.test.SparkStreamingExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml
Example code: you can view the example code on this GitHub repository.
In this example the configuration must be executed inside the putHBase function. That is why the variables for the token file names must be set outside of the function. If you try to use the Spark Streaming Context inside that function, it throws an error as MCC is not serializable. This does not happen in structured streaming which is shown in the following examples.
Example on reading from HDFS and writing to HBase with Spark structured streaming
In this example an application is created that will listen to an HDFS directory for new files. When a file is added, it will read the file into a micro-batch using structured streaming and push the results into HBase.
spark-submit --class com.cloudera.hbase.mcc.scala.test.StructuredStreamingHDFSToHBase \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-memory 2g \
--executor-memory 2g \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/cluster1/hbase-site1.xml,/tmp/cluster1/core-site1.xml,/tmp/cluster2/hbase-site2.xml,/tmp/cluster2/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn \
--deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml \
/user/exampleuser/StructuredStreamingExampleSource/ /user/exampleuser/SparkStreamingExampleCheckpoint/
Example code: you can view the example code on this GitHub repository.
Example on reading from Kafka and writing to HBase with Spark structured streaming
The following example shows how to create an application that pulls messages from Kafka. When you push a set of messages to Kafka, this process creates a micro-batch using the structured streaming and push the results into HBase.
spark.kafkatohbase.target.hbase.table test_table
spark.kafkatohbase.target.hbase.cf cf1
spark.kafkatohbase.source.kafka.topic spark_2
spark.kafkatohbase.checkpoint.dir /user/exampleuser/SparkStreamingExampleCheckpoint/
spark.kafkatohbase.kafka.ssl.truststore.location ./cm-auto-global_truststore.jks
spark.kafkatohbase.kafka.bootstrap.servers yourzookeeper1:9093,yourzookeeper2:9093,yourzookeeper3:9093
spark.kafkatohbase.kafka.security.protocol SASL_SSL
spark.kafkatohbase.kafka.ssl.truststore.password trustorepasswordhere
spark.kafkatohbase.startingOffsets earliest
spark.kafkatohbase.keytab kafka.keytab
spark.kafkatohbase.principal kafka/example.server.com@EXAMPLE.COM
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
debug =true
useKeyTab=true
storeKey=true
keyTab="/tmp/kafka.keytab"
useTicketCache=false
serviceName="kafka"
principal="kafka/example.server.com@EXAMPLE.COM";
};
hdfs dfs -rm -r /user/cluster1tls/SparkStreamingExampleCheckpoint/
hdfs dfs -mkdir /user/cluster1tls/SparkStreamingExampleCheckpoint/
spark-submit --class com.cloudera.hbase.mcc.scala.test.StructuredStreamingExampleKafkaToHBase \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-memory 2g \
--executor-memory 2g \
--conf spark.streaming.receiver.maxRate=20 \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.initialExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=10 \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--conf "spark.security.credentials.hive.enabled=false" \
--conf "spark.security.credentials.hdfs.enabled=false" \
--driver-java-options "-Djava.security.auth.login.config=./producer_jaas2.conf -Djava.io.tmpdir=/tmp" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./producer_jaas2.conf -Djava.io.tmpdir=/tmp" \
--files /tmp/cluster1/hbase-site1.xml,/tmp/cluster1/core-site1.xml,/tmp/cluster2/hbase-site2.xml,/tmp/cluster2/core-site2.xml,/tmp/producer_jaas.conf#producer_jaas.conf,/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks,/tmp/kafka.keytab \
--principal exampleuser/example.server.com@EXAMPLE.COM" \
--keytab /tmp/cluster1/exampleuser.keytab \
--master yarn \
--deploy-mode cluster \
--properties-file /tmp/properties.conf \
/tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml
Example code: you can view the example code on this GitHub repository.
Example on writing to HBase bulk put and hbase-spark
spark-submit --class com.cloudera.hbase.mcc.scala.test.HBaseMCCBulkPutExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml
Example code: you can view the example code on this GitHub repository.
Example on writing to HBase bulk get and hbase-spark
spark-submit --class com.cloudera.hbase.mcc.scala.test.HBaseMCCBulkGetExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml
Example code: you can view the example code on this GitHub repository.
Example on writing to HBase bulk delete and hbase-spark
spark-submit --class com.cloudera.hbase.mcc.scala.test.HBaseMCCBulkDeleteExample \
--jars "/tmp/hbase-mcc-0.2.0-SNAPSHOT.jar" \
--driver-class-path "hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.executor.extraClassPath=hbase-mcc-0.2.0-SNAPSHOT.jar" \
--conf "spark.hbase.connector.security.credentials.enabled=true" \
--conf "spark.yarn.principal=exampleuser/example.kdc.server.com@EXAMPLE.COM" \
--conf "spark.yarn.keytab=/tmp/configs/exampleuser.keytab" \
--conf "spark.security.credentials.hbase.enabled=false" \
--files /tmp/configs/hbase-site1.xml,/tmp/configs/core-site1.xml,/tmp/configs/hbase-site2.xml,/tmp/configs/core-site2.xml \
--principal exampleuser/example.kdc.server.com@EXAMPLE.COM \
--keytab /tmp/configs/exampleuser.keytab \
--master yarn --deploy-mode cluster /tmp/ScalaHbaseTest.jar hbase-site1.xml core-site1.xml hbase-site2.xml core-site2.xml
Example code: you can view the example code on this GitHub repository.