Sample code to connect to Phoenix data connector
Sample application code for connecting to a Phoenix data connector in Cloudera Data Engineering.
The following example shows the application code to read, write, and run queries on a Phoenix table for a Cloudera Data Engineering job:
import java.sql.DriverManager
import org.apache.spark.sql.{SparkSession, SaveMode}
import java.util.Arrays
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
object PhoenixReadWriteExecute {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Phoenix-Scala-Read-Write-Execute")
.getOrCreate()
val zkUrl = "[***ZOOKEEPER-URL***]"
val jdbcUrl = s"jdbc:phoenix:$zkUrl"
val tableName = "employees"
val data = Arrays.asList(
Row(1, "Alice"),
Row(2, "Bob"),
Row(3, "Peter"),
Row(4, "John")
)
val schema = StructType(Array(
StructField("id", IntegerType, false),
StructField("name", StringType, false)
))
val df = spark.createDataFrame(data, schema)
df.write
.format("phoenix")
.mode(SaveMode.Append)
.option("table", tableName)
.option("zkUrl", zkUrl)
.save()
val dfo = spark.read
.format("phoenix")
.option("table", "employees")
.option("zkUrl", zkUrl)
.load()
dfo.show()
val conn = DriverManager.getConnection(jdbcUrl)
val stmt = conn.createStatement()
val result = stmt.executeQuery("SELECT COUNT(*) FROM employees")
if (result.next()) {
val count = result.getInt(1)
println(s"Total employees: $count")
}
stmt.close()
conn.close()
spark.stop()
}
}
