Understanding Apache Phoenix-Spark Connector
You can use Apache Phoenix-Spark plugin on your secured clusters to perform READ and WRITE operations.
Connect to a secure cluster
You can connect to a secured cluster using the Phoenix JDBC connector. Enter the following syntax in the shell:
jdbc:phoenix:<ZK hostnames>:<ZK port>:<root znode>:<principal name>:<keytab file location> jdbc:phoenix:h1.cdh.local,h2.cdh.local,h3.cdh.local:2181:/hbase-secure:user1@cdh.LOCAL:/Users/user1/keytabs/myuser.headless.keytab
You need Principal and Keytab parameters only if you have not done the kinit before starting the job and want Phoenix to log you in automatically.
Considerations for setting up Spark
- Before you can use Phoenix-Spark connector for your Spark programs, you must configure your maven settings to have a repository that points to the password protected repository at
https://archive.cloudera.com/phoenix/6.2/maven-repository/ and use the dependency:
<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>5.0.0-cdh6.2.0</version> <scope>provided</scope> </dependency>
Configure Phoenix-Spark Connector using Cloudera Manager
Minimum Required Role: Configurator (also provided by Cluster Administrator, Full Administrator)
- Go to the Spark service.
- Click the Configuration tab.
- Select .
- Select .
- Locate the Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-defaults.conf property or search for it by typing its name in the Search box.
- Add the following properties to ensure that all required Phoenix and HBase platform dependencies are available on the classpath for the Spark executors and drivers:
Phoenix client JARs:
spark.executor.extraClassPath=phoenix-<version>-client.jar spark.driver.extraClassPath=phoenix-<version>-client.jar
Phoenix-Spark JARs:
spark.executor.extraClassPath=phoenix-spark-<version>.jar spark.driver.extraClassPath=phoenix-spark-<version>.jar
- Enter a Reason for change, and then click Save Changes to commit the changes.
- Restart the role and service when Cloudera Manager prompts you to restart.
Phoenix-Spark Connector Usage Examples
You can refer to the following Phoenix-Spark connector examples:
- Reading Phoenix tables
- Saving Phoenix tables
- Using PySpark to READ and WRITE tables
Reading Phoenix tables
For example, you have a Phoenix table with the following DDL, you can use one of the following methods to load the table:
- As a DataFrame using the Data Source API
- As a DataFrame using a configuration object
- As an RDD using a Zookeeper URL
CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR); UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1'); UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
Example: Load a DataFrame using the Data Source API
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ val sc = new SparkContext("local", "phoenix-test") val sqlContext = new SQLContext(sc) val df = sqlContext.load( "org.apache.phoenix.spark", Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181") ) df .filter(df("COL1") === "test_row_1" && df("ID") === 1L) .select(df("ID")) .show
Example: Load as a DataFrame directly using a Configuration object
import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ val configuration = new Configuration() // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' val sc = new SparkContext("local", "phoenix-test") val sqlContext = new SQLContext(sc) // Loads the columns 'ID' and 'COL1' from TABLE1 as a DataFrame val df = sqlContext.phoenixTableAsDataFrame( "TABLE1", Array("ID", "COL1"), conf = configuration ) df.show
Example: Load as an RDD using a Zookeeper URL
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ val sc = new SparkContext("local", "phoenix-test") // Loads the columns 'ID' and 'COL1' from TABLE1 as an RDD val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") ) rdd.count() val firstId = rdd1.first()("ID").asInstanceOf[Long] val firstCol = rdd1.first()("COL1").asInstanceOf[String]
Saving Phoenix tables
You can refer to the following examples for saving RDDs and DataFrames.
Example: Saving RDDs
For example, you have a Phoenix table with the following DDL, you can save it as an RDD.
CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
The saveToPhoenix method is an implicit method on RDD[Product], or an RDD of Tuples. The data types must correspond to one of the Java types supported by Phoenix.
import org.apache.spark.SparkContext import org.apache.phoenix.spark._ val sc = new SparkContext("local", "phoenix-test") val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) sc .parallelize(dataSet) .saveToPhoenix( "OUTPUT_TEST_TABLE", Seq("ID","COL1","COL2"), zkUrl = Some("phoenix-server:2181") )
Example: Saving DataFrames
The save is method on DataFrame allows passing in a data source type. You can use org.apache.phoenix.spark, and must also pass in a table and zkUrl parameter to specify which table and server to persist the DataFrame to. The column names are derived from the DataFrame’s schema field names, and must match the Phoenix column names. The save method also takes a SaveMode option, for which only SaveMode.Overwrite is supported. For example, you have a two Phoenix tables with the following DDL, you can save it as a DataFrames.
Using PySpark to READ and WRITE tables
With Spark’s DataFrame support, you can use pyspark to READ and WRITE from Phoenix tables.
Example: Load a DataFrame
Given a table TABLE1 and a Zookeeper url of localhost:2181, you can load the table as a DataFrame using the following Python code in pyspark:
df = sqlContext.read \ .format("org.apache.phoenix.spark") \ .option("table", "TABLE1") \ .option("zkUrl", "localhost:2181") \ .load()
Example: Save a DataFrame
Given the same table and Zookeeper URLs above, you can save a DataFrame to a Phoenix table using the following code:
df.write \ .format("org.apache.phoenix.spark") \ .mode("overwrite") \ .option("table", "TABLE1") \ .option("zkUrl", "localhost:2181") \ .save()