Example: Using the HBase-Spark connector
Learn how to use the HBase-Spark connector by following an example scenario.
Schema
In this example we want to store personal data in an HBase table. We want to store name, email
address, birth date and height as a floating point number. The contact information (email) is
stored in the
c
column family and personal information (birth date, height) is
stored in the p
column family. The key in HBase table will be the
name
attribute.Spark | HBase | |
---|---|---|
Type/Table | Person |
person |
Name | name: String |
key |
Email address | email: String |
c:email |
Birth date | birthDate: Date |
p:birthDate |
Height | height: Float |
p:height |
Create HBase table
Use the following command to create the HBase
table:
shell> create 'person', 'p', 'c'
Insert data (Scala)
Use the following spark code in spark-shell or spark3-shell to insert data into our HBase
table:
import java.sql.Date
case class Person(name: String, email: String, birthDate: Date, height: Float)
var personDS = Seq(Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f), Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)).toDS
personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).save()
Insert data (Python)
Use the following spark code in pyspark or pyspark3 to insert data into our HBase
table:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
data = [("alice","alice@alice.com", datetime.strptime("2000-01-01",'%Y-%m-%d'), 4.5),
("bob","bob@bob.com", datetime.strptime("2001-10-17",'%Y-%m-%d'), 5.1)
]
schema = StructType([ \
StructField("name",StringType(),True), \
StructField("email",StringType(),True), \
StructField("birthDate", DateType(),True), \
StructField("height", FloatType(), True)
])
personDS = spark.createDataFrame(data=data,schema=schema)
personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).save()
Scan data
The previously inserted data can be tested with a simple
scan:
shell> scan ‘person’
ROW COLUMN+CELL
alice column=c:email, timestamp=1568723598292, value=alice@alice.com
alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
alice column=p:height, timestamp=1568723598292, value=@\x90\x00\x00
bob column=c:email, timestamp=1568723598521, value=bob@bob.com
bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
bob column=p:height, timestamp=1568723598521, value=@\xA333
2 row(s)
Read data back (Scala)
Use the following snippet in spark-shell or spark3-shell to read the data
back:
val sql = spark.sqlContext
val df = sql.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.load()
df.createOrReplaceTempView("personView")
val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The result of this snippet is the following Data
Frame:
+-----+------+---------------+----------+
| name|height| email| birthDate|
+-----+------+---------------+----------+
|alice| 4.5|alice@alice.com|2000-01-01|
+-----+------+---------------+----------+
Read data back (Python)
Use the following snippet in pyspark or pyspark3 to read the data
back:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()
df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()