Example: Using the HBase-Spark connector

Learn how to use the HBase-Spark connector by following an example scenario.

If you follow the instructions mentioned in Configure HBase-Spark connector using Cloudera Manager topic, Cloudera Manager automatically configures the connector for Spark. If you have not, add the following parameters to the command line while running spark-submit, spark-shell, or pyspark commands.

  • --conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors_for_spark3/lib/hbase-spark3.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors_for_spark3/lib/hbase-spark3-protocol-shaded.jar,`hbase mapredcp | tr : ,`
You can use the following command to create the hbase-site.xml.jar file. The hbase-site.xml is added to the classpath with the spark.jars parameter because it is part of the jar file’s root path.
jar cf hbase-site.xml.jar hbase-site.xml

Schema

In this example we want to store personal data in an HBase table. We want to store name, email address, birth date and height as a floating point number. The contact information (email) is stored in the c column family and personal information (birth date, height) is stored in the p column family. The key in HBase table will be the name attribute.
Spark HBase
Type/Table Person person
Name name: String key
Email address email: String c:email
Birth date birthDate: Date p:birthDate
Height height: Float p:height

Create HBase table

Use the following command to create the HBase table:
shell> create 'person', 'p', 'c'

Insert data (Scala)

Use the following spark code in spark-shell to insert data into our HBase table:
val sql = spark.sqlContext

import java.sql.Date

case class Person(name: String,
                 email: String,
                 birthDate: Date,
                 height: Float)

var personDS = Seq(
 Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f),
 Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)
).toDS

if (true) {
  personDS.write.format("org.apache.hadoop.hbase.spark")
   .option("hbase.columns.mapping",
     "name STRING :key, email STRING c:email, " +
       "birthDate DATE p:birthDate, height FLOAT p:height")
   .option("hbase.table", "person")
   .option("hbase.spark.use.hbasecontext", false)
   .save()
}

Insert data (Python)

Use the following spark code in pyspark to insert data into our HBase table:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType

data = [("alice","alice@alice.com", datetime.strptime("2000-01-01",'%Y-%m-%d'), 4.5),
    ("bob","bob@bob.com", datetime.strptime("2001-10-17",'%Y-%m-%d'), 5.1)
  ]

schema = StructType([ \
    StructField("name",StringType(),True), \
    StructField("email",StringType(),True), \
    StructField("birthDate", DateType(),True), \
    StructField("height", FloatType(), True)
  ])
 
personDS = spark.createDataFrame(data=data,schema=schema)

personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).save()

Scan data

The previously inserted data can be tested with a simple scan:
shell> scan ‘person’
ROW		COLUMN+CELL
 alice		column=c:email, timestamp=1568723598292, value=alice@alice.com
 alice		column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
 alice		column=p:height, timestamp=1568723598292, value=@\x90\x00\x00
 bob		column=c:email, timestamp=1568723598521, value=bob@bob.com
 bob		column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
 bob		column=p:height, timestamp=1568723598521, value=@\xA333
2 row(s)

Read data back (Scala)

Use the following snippet in spark-shell to read the data back:
val sql = spark.sqlContext
var df = spark.emptyDataFrame

if (true) {
  df = sql.read.format("org.apache.hadoop.hbase.spark") 
 .option("hbase.columns.mapping",
   "name STRING :key, email STRING c:email, " +
     "birthDate DATE p:birthDate, height FLOAT p:height")
 .option("hbase.table", "person")
 .option("hbase.spark.use.hbasecontext", false)
 .load()
 }

df.createOrReplaceTempView("personView")val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The result of this snippet is the following Data Frame:
+-----+------+---------------+----------+
| name|height|          email| birthDate|
+-----+------+---------------+----------+
|alice|   4.5|alice@alice.com|2000-01-01|
+-----+------+---------------+----------+

Read data back (Python)

Use the following snippet in pyspark to read the data back:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()

df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()

Test spark-submit

Use the following snippet to test spark-submit commands in Spark cluster mode.

pyspark_app.py:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()

df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
spark.stop()
Test commands:
  • Spark3:
    spark3-submit --deploy-mode cluster --conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors_for_spark3/lib/hbase-spark3.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors_for_spark3/lib/hbase-spark3-protocol-shaded.jar,`hbase mapredcp | tr : ,` pyspark_app.py

SparkSQL or DataFrames

You need to define the Catalog for the schema mapping between the HBase and Spark tables, prepare the data, populate the HBase table, and then load the HBase DataFrame. Afterward, you can run an integrated query and access records in HBase tables with SQL query. The following illustrates the basic procedure. For more example, see the Apache upstream documentation, SparkSQL/DataFrames.

Define catalog

def catalog = s"""{
       |"table":{"namespace":"default", "name":"table1"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin

Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. One is the rowkey definition and the other is the mapping between the table column in Spark and the column family and column qualifier in HBase. The above defines a schema for an HBase table with the name table1, rowkey as key, and several columns (col1 - col8). Note that the rowkey also has to be defined in detail as a column (col0), which has a specific cf (rowkey).

Save the DataFrame

case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,       
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)

object HBaseRecord
{                                                                                                             
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""       
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,  
      i,
      i.toLong,
      i.toShort,  
      s"String$i: $t",      
      i.toByte)
  }
}

val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
 .format("org.apache.hadoop.hbase.spark ")
 .save()

The data represents a local Scala collection that has 256 HBaseRecord objects. The sc.parallelize(data) function distributes data to form an RDD. The toDF returns a DataFrame. The write function returns a DataFrameWriter used to write the DataFrame to external storage systems (for example, HBase here). In the DataFrame with a specified schema catalog, the save function creates an HBase table with 5 regions and saves the DataFrame inside.

Load the DataFrame

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.hadoop.hbase.spark")
  .load()
}
val df = withCatalog(catalog)

In withCatalog function, sqlContext is a variable of SQLContext, which is the entry point for working with structured data (rows and columns) in Spark. The read function returns a DataFrameReader that can be used to read data in as a DataFrame. The option function adds input options for the underlying data source to the DataFrameReader, and the format function specifies the input data source format for the DataFrameReader. The load() function loads input in as a DataFrame. The data frame df returned by withCatalog function could be used to access the HBase table, such as 4.4 and 4.5.

Language integrated query

val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  $"col0" === "row005" ||
  $"col0" <= "row005")
  .select("col0", "col1", "col4")
s.show

The DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on. The df.filter function above filters rows using the given SQL expression. The select function selects a set of columns: col0, col1 and col4.

SQL query

df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show

The registerTempTable function registers df DataFrame as a temporary table using the table name table1. The lifetime of this temporary table is tied to the SQLContext that was used to create the data frame df. The sqlContext.sql function allows the user to execute SQL queries.