Connecting Cloudera Data Engineering to Apache Phoenix
Apache Phoenix acts as a relational database engine and a programmatic, ANSI-compliant SQL layer on top of Apache HBase within the Cloudera ecosystem. When integrated with Cloudera Data Engineering, it enables Data Engineers to run high-performance Apache Spark batch jobs and pipelines against the Cloudera Operational Database using familiar SQL syntax rather than complex NoSQL APIs. Connect to Phoenix using Cloudera Data Engineering on the base cluster.
- You must download the following JAR files for compiling Scala
application:
- hbase-shaded-mapreduce-[***HBASE-CLOUDERA-RUNTIME-VERSION***].jar
- phoenix5-spark3-shaded-[***PHOENIX-CLOUDERA-RUNTIME-VERSION***].jar
- spark-sql_2.12-[***SPARK3-CLOUDERA-RUNTIME-VERSION***].jar
- opentelemetry-api-0.12.0.jar
- spark-catalyst_2.12-[***SPARK3-CLOUDERA-RUNTIME-VERSION***].jar
- opentelemetry-context-0.12.0.jar
- spark-core_2.12-[***SPARK3-CLOUDERA-RUNTIME-VERSION***].jar
- You must get the hbase-site.xml file from the base cluster.
- Open your terminal and log into the HBase Gateway node using your SSH credentials.
- Change your current working directory to
/etc/hbase/conf and list the files in the
directory.
$ cd /etc/hbase/conf $ ls atlas-application.properties __cloudera_metadata__ hbase-env.sh hdfs-site.xml log4j.properties ssl-client.xml __cloudera_generation__ core-site.xml hbase-site.xml jaas.conf ozone-site.xml - Copy the hbase-site.xml file.
- You must provide read, write, run, and create permissions to the workload user
for the Phoenix table from the Ranger UI. For more information, see Configure a resource-based policy:
HBase.
Also provide read, write, and run permissions for the user on
SYSTEM:STATSPhoenix table. For more information, see Phoenix - Part 4 - working with Ranger. - Create a Phoenix table using on of the following methods:
- Using the
phoenix-sqllinetool:- Open your terminal and log into the HBase Gateway node using your SSH credentials.
- Run the kinit command using either the HBase administrator principal or a workload user principal that has access permissions to Phoenix.
- Connect to the cluster using the
phoenix-sqllinetool and create the table.$ phoenix-sqlline Setting property: [incremental, false] Setting property: [isolation, TRANSACTION_READ_COMMITTED] issuing: !connect -p driver org.apache.phoenix.jdbc.PhoenixDriver -p user "none" -p password "none" "jdbc:phoenix:" Connecting to jdbc:phoenix: 26/03/24 20:07:06 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-phoenix.properties,hadoop-metrics2.properties Connected to: Phoenix (version 5.1) Driver: PhoenixEmbeddedDriver (version 5.1) Autocommit status: true Transaction isolation: TRANSACTION_READ_COMMITTED sqlline version 1.9.0 0: jdbc:phoenix:> CREATE TABLE IF NOT EXISTS employees (id BIGINT NOT NULL PRIMARY KEY, name VARCHAR); No rows affected (0.015 seconds) 0: jdbc:phoenix:> SELECT TABLE_NAME FROM SYSTEM.CATALOG WHERE TABLE_TYPE='u'; +------------+ | TABLE_NAME | +------------+ | EMPLOYEES | +------------+ 1 row selected (0.028 seconds) 0: jdbc:phoenix:> SELECT COLUMN_NAME, DATA_TYPE FROM SYSTEM.CATALOG WHERE TABLE_NAME='EMPLOYEES'; +-------------+-----------+ | COLUMN_NAME | DATA_TYPE | +-------------+-----------+ | | null | | | null | | ID | 4 | | NAME | 12 | +-------------+-----------+ 4 rows selected (0.016 seconds)
- Using Cloudera Data Engineering jobs:
- Use the following Scala application code to create a Phoenix table in the default namespace or schema. The workload user must have access to create tables within the default namespace or schema.
import java.sql.DriverManager import org.apache.spark.sql.{SparkSession, SaveMode} import java.util.Arrays import org.apache.spark.sql.types._ import org.apache.spark.sql.Row object PhoenixCreate { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Phoenix-Scala-DDL-Write") .getOrCreate() val zkUrl = "[***ZOOKEEPER-URL***]" val jdbcUrl = s"jdbc:phoenix:$zkUrl" val tableName = "employees" val createTableSql = s""" CREATE TABLE IF NOT EXISTS $tableName ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR ) """ var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(jdbcUrl) val stmt = conn.createStatement() stmt.executeUpdate(createTableSql) stmt.close() } catch { case e: Exception => e.printStackTrace() } finally { if (conn != null) conn.close() } spark.stop() } }
- Using the
