Example: Using the HBase-Spark connector

Learn how to use the HBase-Spark connector by following an example scenario.

If you follow the instructions mentioned in Configure HBase-Spark connector using Cloudera Manager topic, Cloudera Manager automatically configures the connector for Spark. If you have not, add the following parameters to the command line while running spark-submit, spark-shell, pyspark commands.

Spark2:
--conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/scala-library.jar,`hbase mapredcp | tr : ,`
You can use the following command to create the hbase-site.xml.jar file. The hbase-site.xml is added to the classpath with the spark.jars parameter because it is part of the jar file’s root path.
jar cf hbase-site.xml.jar hbase-site.xml

Schema

In this example we want to store personal data in an HBase table. We want to store name, email address, birth date and height as a floating point number. The contact information (email) is stored in the c column family and personal information (birth date, height) is stored in the p column family. The key in HBase table will be the name attribute.
Spark HBase
Type/Table Person person
Name name: String key
Email address email: String c:email
Birth date birthDate: Date p:birthDate
Height height: Float p:height

Create HBase table

Use the following command to create the HBase table:
shell> create 'person', 'p', 'c'

Insert data (Scala)

Use the following spark code in spark-shell to insert data into our HBase table:
val sql = spark.sqlContext

import java.sql.Date

case class Person(name: String,
                 email: String,
                 birthDate: Date,
                 height: Float)

var personDS = Seq(
 Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f),
 Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)
).toDS

personDS.write.format("org.apache.hadoop.hbase.spark")
 .option("hbase.columns.mapping",
   "name STRING :key, email STRING c:email, " +
     "birthDate DATE p:birthDate, height FLOAT p:height")
 .option("hbase.table", "person")
 .option("hbase.spark.use.hbasecontext", false)
 .save()

Insert data (Python)

Use the following spark code in pyspark to insert data into our HBase table:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType

data = [("alice","alice@alice.com", datetime.strptime("2000-01-01",'%Y-%m-%d'), 4.5),
    ("bob","bob@bob.com", datetime.strptime("2001-10-17",'%Y-%m-%d'), 5.1)
  ]

schema = StructType([ \
    StructField("name",StringType(),True), \
    StructField("email",StringType(),True), \
    StructField("birthDate", DateType(),True), \
    StructField("height", FloatType(), True)
  ])
 
personDS = spark.createDataFrame(data=data,schema=schema)

personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).save()

Scan data

The previously inserted data can be tested with a simple scan:
shell> scan ‘person’
ROW		COLUMN+CELL
 alice		column=c:email, timestamp=1568723598292, value=alice@alice.com
 alice		column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
 alice		column=p:height, timestamp=1568723598292, value=@\x90\x00\x00
 bob		column=c:email, timestamp=1568723598521, value=bob@bob.com
 bob		column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
 bob		column=p:height, timestamp=1568723598521, value=@\xA333
2 row(s)

Read data back (Scala)

Use the following snippet in spark-shell to read the data back:
val sql = spark.sqlContext

val df = sql.read.format("org.apache.hadoop.hbase.spark")
 .option("hbase.columns.mapping",
   "name STRING :key, email STRING c:email, " +
     "birthDate DATE p:birthDate, height FLOAT p:height")
 .option("hbase.table", "person")
 .option("hbase.spark.use.hbasecontext", false)
 .load()
df.createOrReplaceTempView("personView")

val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The result of this snippet is the following Data Frame:
+-----+------+---------------+----------+
| name|height|          email| birthDate|
+-----+------+---------------+----------+
|alice|   4.5|alice@alice.com|2000-01-01|
+-----+------+---------------+----------+

Read data back (Python)

Use the following snippet in pyspark to read the data back:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()

df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()

Test spark-submit

Use the following snippet to test spark-submit commands in Spark cluster mode.

pyspark_app.py:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()

df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
spark.stop()

Spark2 test commands:

spark-submit --deploy-mode cluster --conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/scala-library.jar,`hbase mapredcp | tr : ,` pyspark_app.py