Example: Using the HBase-Spark connector
Learn how to use the HBase-Spark connector by following an example scenario.
Schema
In this example we want to store personal data in an HBase table. We want to store name, email
address, birth date and height as a floating point number. The contact information (email) is
stored in the
c
column family and personal information (birth date, height) is
stored in the p
column family. The key in HBase table will be the
name
attribute.Spark | HBase | |
---|---|---|
Type/Table | Person |
person |
Name | name: String |
key |
Email address | email: String |
c:email |
Birth date | birthDate: Date |
p:birthDate |
Height | height: Float |
p:height |
Create HBase table
Use the following command to create the HBase
table:
shell> create 'person', 'p', 'c'
Insert data
Use the following spark code in spark-shell to insert data into our HBase
table:
val sql = spark.sqlContext
import java.sql.Date
case class Person(name: String,
email: String,
birthDate: Date,
height: Float)
var personDS = Seq(
Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f),
Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)
).toDS
personDS.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()
Scan data
The previously inserted data can be tested with a simple
scan:
shell> scan ‘person’
ROW COLUMN+CELL
alice column=c:email, timestamp=1568723598292, value=alice@alice.com
alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
alice column=p:height, timestamp=1568723598292, value=@\x90\x00\x00
bob column=c:email, timestamp=1568723598521, value=bob@bob.com
bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
bob column=p:height, timestamp=1568723598521, value=@\xA333
2 row(s)
Read data back
Use the following snippet in spark-shell to read the data
back:
val sql = spark.sqlContext
val df = sql.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.load()
df.createOrReplaceTempView("personView")
val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The result of this snippet is the following Data
Frame:
+-----+------+---------------+----------+
| name|height| email| birthDate|
+-----+------+---------------+----------+
|alice| 4.5|alice@alice.com|2000-01-01|
+-----+------+---------------+----------+
Customizing HBase configuration
In some cases the default HBase configuration can be insufficient. For example, when the dataset is located on a different cluster.
HBase configuration can be altered in these
cases:
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.HBaseConfiguration
val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum", "hbase-1.example.com")
// the latest HBaseContext will be used afterwards
new HBaseContext(spark.sparkContext, conf)
val df = sql.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.load()
df.createOrReplaceTempView("personView")
val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()