Example: Using the HBase-Spark connector
Learn how to use the HBase-Spark connector by following an example scenario.
If you follow the instructions mentioned in Configure HBase-Spark connector using Cloudera Manager topic, Cloudera Manager automatically configures the connector for Spark. If you have not, add the following parameters to the command line while running spark-submit, spark3-submit, spark-shell, spark3-shell, pyspark, or pyspark3 commands.
- Spark2:
--conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/scala-library.jar,`hbase mapredcp | tr : ,`
- Spark3:
--conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded.jar,`hbase mapredcp | tr : ,`
You can use the following command to create the hbase-site.xml.jar file.
The hbase-site.xml is added to the classpath with the
spark.jars parameter because it is part of the jar file’s root
path.
jar cf hbase-site.xml.jar hbase-site.xml
Schema
In this example we want to store personal data in an HBase table. We want to store name, email
address, birth date and height as a floating point number. The contact information (email) is
stored in the
c
column family and personal information (birth date, height) is
stored in the p
column family. The key in HBase table will be the
name
attribute.Spark | HBase | |
---|---|---|
Type/Table | Person |
person |
Name | name: String |
key |
Email address | email: String |
c:email |
Birth date | birthDate: Date |
p:birthDate |
Height | height: Float |
p:height |
Create HBase table
Use the following command to create the HBase
table:
shell> create 'person', 'p', 'c'
Insert data (Scala)
Use the following spark code in spark-shell or spark3-shell to insert data into our HBase
table:
val sql = spark.sqlContext import java.sql.Date case class Person(name: String, email: String, birthDate: Date, height: Float) var personDS = Seq(Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f), Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)).toDS var personDS = Seq( Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f), Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f) ).toDS if (true) { personDS.write.format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " + "birthDate DATE p:birthDate, height FLOAT p:height") .option("hbase.table", "person") .option("hbase.spark.use.hbasecontext", false) .save() }
Insert data (Python)
Use the following spark code in pyspark or pyspark3 to insert data into our HBase
table:
from datetime import datetime from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType data = [("alice","alice@alice.com", datetime.strptime("2000-01-01",'%Y-%m-%d'), 4.5), ("bob","bob@bob.com", datetime.strptime("2001-10-17",'%Y-%m-%d'), 5.1) ] schema = StructType([ \ StructField("name",StringType(),True), \ StructField("email",StringType(),True), \ StructField("birthDate", DateType(),True), \ StructField("height", FloatType(), True) ]) personDS = spark.createDataFrame(data=data,schema=schema) personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).save()
Scan data
The previously inserted data can be tested with a simple
scan:
shell> scan ‘person’ ROW COLUMN+CELL alice column=c:email, timestamp=1568723598292, value=alice@alice.com alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00 alice column=p:height, timestamp=1568723598292, value=@\x90\x00\x00 bob column=c:email, timestamp=1568723598521, value=bob@bob.com bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80 bob column=p:height, timestamp=1568723598521, value=@\xA333 2 row(s)
Read data back (Scala)
Use the following snippet in spark-shell or spark3-shell to read the data
back:
val sql = spark.sqlContext var df = spark.emptyDataFrame if (true) { df = sql.read.format("org.apache.hadoop.hbase.spark") .option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " + "birthDate DATE p:birthDate, height FLOAT p:height") .option("hbase.table", "person") .option("hbase.spark.use.hbasecontext", false) .load() } df.createOrReplaceTempView("personView")val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'") results.show()
The result of this snippet is the following Data
Frame:
+-----+------+---------------+----------+ | name|height| email| birthDate| +-----+------+---------------+----------+ |alice| 4.5|alice@alice.com|2000-01-01| +-----+------+---------------+----------+
Read data back (Python)
Use the following snippet in pyspark or pyspark3 to read the data
back:
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate() df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load() df.createOrReplaceTempView("personView") results = spark.sql("SELECT * FROM personView WHERE name = 'alice'") results.show()