Kudu integration with Spark
Kudu integrates with Spark through the Data Source API as of version 1.0.0. Include
the kudu-spark
dependency using the --packages
or --jars
option.
Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.
Use
kudu-spark3_2.12 artifact if using Spark 3 with Scala 2.12.
- For --packages option:
-
spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
- For --jars option:
-
spark-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
Below is
a minimal Spark SQL "select" example for a Kudu table created with Impala in the "default"
database. You first import the kudu spark package, then create a DataFrame, and then create a
view from the DataFrame. After those steps, the table is accessible from Spark SQL. You can
also refer to the Spark quickstart guide or this Kudu-Spark example.
import org.apache.kudu.spark.kudu._
// Create a DataFrame that points to the Kudu table we want to query.
val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051",
"kudu.table" -> "default.my_table")).format("kudu").load
// Create a view from the DataFrame to make it accessible from Spark SQL.
df.createOrReplaceTempView("my_table")
// Now we can run Spark SQL queries against our view of the Kudu table.
spark.sql("select * from my_table").show()
Below is a more sophisticated example that includes both reads and writes:
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._
// Read a table from Kudu
val df = spark.read
.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))
.format("kudu").load
// Query using the Spark API...
df.select("key").filter("key >= 5").show()
// ...or register a temporary table and use SQL
df.createOrReplaceTempView("kudu_table")
val filteredDF = spark.sql("select key from kudu_table where key >= 5").show()
// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)
// Create a new Kudu table from a DataFrame schema
// NB: No rows from the DataFrame are inserted into the table
kuduContext.createTable(
"test_table", df.schema, Seq("key"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("key").asJava, 3))
// Check for the existence of a Kudu table
kuduContext.tableExists("test_table")
// Insert data
kuduContext.insertRows(df, "test_table")
// Delete data
kuduContext.deleteRows(df, "test_table")
// Upsert data
kuduContext.upsertRows(df, "test_table")
// Update data
val updateDF = df.select($"key", ($"int_val" + 1).as("int_val"))
kuduContext.updateRows(updateDF, "test_table")
// Data can also be inserted into the Kudu table using the data source, though the methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert
// in the options map
// NB: Only mode Append is supported
df.write
.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
.mode("append")
.format("kudu").save
// Delete a Kudu table
kuduContext.deleteTable("test_table")