Example: Using the HBase-Spark connector
Learn how to use the HBase-Spark connector by following an example scenario.
If you follow the instructions mentioned in Configure HBase-Spark connector using Cloudera Manager topic, Cloudera Manager automatically configures the connector for Spark. If you have not, add the following parameters to the command line while running spark-submit, spark3-submit, spark-shell, spark3-shell, pyspark, or pyspark3 commands.
- Spark2:
--conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/scala-library.jar,`hbase mapredcp | tr : ,`
- Spark3:
--conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded.jar,`hbase mapredcp | tr : ,`
jar cf hbase-site.xml.jar hbase-site.xml
Schema
c
column family and personal information (birth date, height) is
stored in the p
column family. The key in HBase table will be the
name
attribute.Spark | HBase | |
---|---|---|
Type/Table | Person |
person |
Name | name: String |
key |
Email address | email: String |
c:email |
Birth date | birthDate: Date |
p:birthDate |
Height | height: Float |
p:height |
Create HBase table
shell> create 'person', 'p', 'c'
Insert data (Scala)
val sql = spark.sqlContext
import java.sql.Date
case class Person(name: String,
email: String,
birthDate: Date,
height: Float)
var personDS = Seq(
Person("alice", "alice@alice.com", Date.valueOf("2000-01-01"), 4.5f),
Person("bob", "bob@bob.com", Date.valueOf("2001-10-17"), 5.1f)
).toDS
if (true) {
personDS.write.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.save()
}
Insert data (Python)
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
data = [("alice","alice@alice.com", datetime.strptime("2000-01-01",'%Y-%m-%d'), 4.5),
("bob","bob@bob.com", datetime.strptime("2001-10-17",'%Y-%m-%d'), 5.1)
]
schema = StructType([ \
StructField("name",StringType(),True), \
StructField("email",StringType(),True), \
StructField("birthDate", DateType(),True), \
StructField("height", FloatType(), True)
])
personDS = spark.createDataFrame(data=data,schema=schema)
personDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).save()
Scan data
shell> scan ‘person’
ROW COLUMN+CELL
alice column=c:email, timestamp=1568723598292, value=alice@alice.com
alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
alice column=p:height, timestamp=1568723598292, value=@\x90\x00\x00
bob column=c:email, timestamp=1568723598521, value=bob@bob.com
bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
bob column=p:height, timestamp=1568723598521, value=@\xA333
2 row(s)
Read data back (Scala)
val sql = spark.sqlContext
var df = spark.emptyDataFrame
if (true) {
df = sql.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping",
"name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
.option("hbase.table", "person")
.option("hbase.spark.use.hbasecontext", false)
.load()
}
df.createOrReplaceTempView("personView")val results = sql.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
+-----+------+---------------+----------+
| name|height| email| birthDate|
+-----+------+---------------+----------+
|alice| 4.5|alice@alice.com|2000-01-01|
+-----+------+---------------+----------+
Read data back (Python)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()
df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
Test spark-submit or spark3-submit
Use the following snippet to test spark-submit or spark3-submit commands in Spark cluster mode.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test HBase Connector from Python").getOrCreate()
df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "name STRING :key, email STRING c:email, birthDate DATE p:birthDate, height FLOAT p:height").option("hbase.table", "person").option("hbase.spark.use.hbasecontext", False).load()
df.createOrReplaceTempView("personView")
results = spark.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
spark.stop()
- Spark2:
spark-submit --deploy-mode cluster --conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/scala-library.jar,`hbase mapredcp | tr : ,` pyspark_app.py
- Spark3:
spark3-submit --deploy-mode cluster --conf spark.jars=/path/to/hbase-site.xml.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors_for_spark3/lib/hbase-spark3.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors_for_spark3/lib/hbase-spark3-protocol-shaded.jar,`hbase mapredcp | tr : ,` pyspark_app.py
SparkSQL or DataFrames
You need to define the Catalog for the schema mapping between the HBase and Spark tables, prepare the data, populate the HBase table, and then load the HBase DataFrame. Afterward, you can run an integrated query and access records in HBase tables with SQL query. The following illustrates the basic procedure. For more example, see the Apache upstream documentation, SparkSQL/DataFrames.
Define catalog
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. One is the rowkey definition and the other is the mapping between the table column in Spark and the column family and column qualifier in HBase. The above defines a schema for an HBase table with the name table1, rowkey as key, and several columns (col1 - col8). Note that the rowkey also has to be defined in detail as a column (col0), which has a specific cf (rowkey).
Save the DataFrame
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
The data represents a local Scala collection that has 256 HBaseRecord objects. The sc.parallelize(data) function distributes data to form an RDD. The toDF returns a DataFrame. The write function returns a DataFrameWriter used to write the DataFrame to external storage systems (for example, HBase here). In the DataFrame with a specified schema catalog, the save function creates an HBase table with 5 regions and saves the DataFrame inside.
Load the DataFrame
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)
In withCatalog function, sqlContext is a variable of SQLContext, which is the entry point for working with structured data (rows and columns) in Spark. The read function returns a DataFrameReader that can be used to read data in as a DataFrame. The option function adds input options for the underlying data source to the DataFrameReader, and the format function specifies the input data source format for the DataFrameReader. The load() function loads input in as a DataFrame. The data frame df returned by withCatalog function could be used to access the HBase table, such as 4.4 and 4.5.
Language integrated query
val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" <= "row005")
.select("col0", "col1", "col4")
s.show
The DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on. The df.filter function above filters rows using the given SQL expression. The select function selects a set of columns: col0, col1 and col4.
SQL query
df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show
The registerTempTable function registers df DataFrame as a temporary table using the table name table1. The lifetime of this temporary table is tied to the SQLContext that was used to create the data frame df. The sqlContext.sql function allows the user to execute SQL queries.