Developing Applications with Apache KuduPDF version

Kudu integration with Spark

Kudu integrates with Spark through the Data Source API as of version 1.0.0. Include the kudu-spark dependency using the --packages option.

Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.

spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0-cdh5.13.91 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/

Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11.

spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
Below is a minimal Spark SQL "select" example for a Kudu table created with Impala in the "default" database. We first import the kudu spark package, then create a DataFrame, and then create a view from the DataFrame. After those steps, the table is accessible from Spark SQL. You can also refer to the Spark quickstart guide or this Kudu-Spark example.
import org.apache.kudu.spark.kudu._

// Create a DataFrame that points to the Kudu table we want to query.
val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051",
                                "kudu.table" -> "default.my_table")).format("kudu").load
// Create a view from the DataFrame to make it accessible from Spark SQL.
df.createOrReplaceTempView("my_table")
// Now we can run Spark SQL queries against our view of the Kudu table.
spark.sql("select * from my_table").show()

Below is a more sophisticated example that includes both reads and writes:

import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._

// Read a table from Kudu
val df = spark.read
  .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))
  .format("kudu").load

// Query using the Spark API...
df.select("key").filter("key >= 5").show()

// ...or register a temporary table and use SQL
df.createOrReplaceTempView("kudu_table")
val filteredDF = spark.sql("select key from kudu_table where key >= 5").show()

// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)

// Create a new Kudu table from a DataFrame schema
// NB: No rows from the DataFrame are inserted into the table
kuduContext.createTable(
    "test_table", df.schema, Seq("key"),
    new CreateTableOptions()
        .setNumReplicas(1)
        .addHashPartitions(List("key").asJava, 3))

// Check for the existence of a Kudu table
kuduContext.tableExists("test_table")

// Insert data
kuduContext.insertRows(df, "test_table")

// Delete data
kuduContext.deleteRows(df, "test_table")

// Upsert data
kuduContext.upsertRows(df, "test_table")

// Update data
val updateDF = df.select($"key", ($"int_val" + 1).as("int_val"))
kuduContext.updateRows(updateDF, "test_table")

// Data can also be inserted into the Kudu table using the data source, though the methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert
// in the options map
// NB: Only mode Append is supported

df.write
  .options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
  .mode("append")
  .format("kudu").save

// Delete a Kudu table
kuduContext.deleteTable("test_table")