Example: Using the HBase-Spark connector
Learn how to use the HBase-Spark connector by following an example scenario.
Schema
In this example we want to store personal data in an HBase table. We want to store name, email
address, birth date and height as a floating point number. The contact information (email) is
stored in the
c
column family and personal information (birth date, height) is
stored in the p
column family. The key in HBase table will be the
name
attribute.Spark | HBase | |
---|---|---|
Type/Table | Person |
person |
Name | name: String |
key |
Email address | email: String |
c:email |
Birth date | birthDate: Date |
p:birthDate |
Height | height: Float |
p:height |
Create HBase table
Use the following command to create the HBase
table:
shell> create 'person', 'p', 'c'
Insert data (Scala)
Use the following spark code in spark-shell or spark3-shell to insert data into our HBase
table:
val sql = spark.sqlContext
import java.sql.Date
case class Person(name: String,
email: String,
birthDate: Date,
height: Float)
var personDS = Seq(
Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f),
Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)
).toDS
if (true) {
personDS.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()
}
Insert data (Python)
Use the following spark code in pyspark or pyspark3 to insert data into our HBase
table:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
personDS.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()
Scan data
The previously inserted data can be tested with a simple
scan:
shell> scan ‘person’
ROW COLUMN+CELL
alice column=c:email, timestamp=1568723598292, value=alice@alice.com
alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
alice column=p:height, timestamp=1568723598292, value=@\x90\x00\x00
bob column=c:email, timestamp=1568723598521, value=bob@bob.com
bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
bob column=p:height, timestamp=1568723598521, value=@\xA333
2 row(s)
Read data back (Scala)
Use the following snippet in spark-shell or spark3-shell to read the data
back:
val sql = spark.sqlContext
var df = spark.emptyDataFrame
if (true) {
df = sql.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.load()
}
df.createOrReplaceTempView("personView")val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The result of this snippet is the following Data
Frame:
+-----+------+---------------+----------+
| name|height| email| birthDate|
+-----+------+---------------+----------+
|alice| 4.5|alice@alice.com|2000-01-01|
+-----+------+---------------+----------+
Customizing HBase configuration
In some cases the default HBase configuration can be insufficient. For example, when the dataset is located on a different cluster.
HBase configuration can be altered in these
cases:
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.HBaseConfiguration
val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum", "hbase-1.example.com")
// the latest HBaseContext will be used afterwards
new HBaseContext(spark.sparkContext, conf)
val df = sql.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.load()
df.createOrReplaceTempView("personView")
val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()