Kudu integration with Spark
Kudu integrates with Spark through the Data Source API as of version 1.0.0. Include
the kudu-spark
dependency using the --packages
or --jars
Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.
kudu-spark3_2.12 artifact if using Spark 3 with Scala 2.12.
- For --packages option:
spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
- For --jars option:
spark-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
Below is
a minimal Spark SQL "select" example for a Kudu table created with Impala in the "default"
database. You first import the kudu spark package, then create a DataFrame, and then create a
view from the DataFrame. After those steps, the table is accessible from Spark SQL. You can
also refer to the Spark quickstart guide or this Kudu-Spark example.
import org.apache.kudu.spark.kudu._ // Create a DataFrame that points to the Kudu table we want to query. val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "default.my_table")).format("kudu").load // Create a view from the DataFrame to make it accessible from Spark SQL. df.createOrReplaceTempView("my_table") // Now we can run Spark SQL queries against our view of the Kudu table. spark.sql("select * from my_table").show()
Below is a more sophisticated example that includes both reads and writes:
import org.apache.kudu.client._ import org.apache.kudu.spark.kudu.KuduContext import collection.JavaConverters._ // Read a table from Kudu val df = spark.read .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table")) .format("kudu").load // Query using the Spark API... df.select("key").filter("key >= 5").show() // ...or register a temporary table and use SQL df.createOrReplaceTempView("kudu_table") val filteredDF = spark.sql("select key from kudu_table where key >= 5").show() // Use KuduContext to create, delete, or write to Kudu tables val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext) // Create a new Kudu table from a DataFrame schema // NB: No rows from the DataFrame are inserted into the table kuduContext.createTable( "test_table", df.schema, Seq("key"), new CreateTableOptions() .setNumReplicas(1) .addHashPartitions(List("key").asJava, 3)) // Check for the existence of a Kudu table kuduContext.tableExists("test_table") // Insert data kuduContext.insertRows(df, "test_table") // Delete data kuduContext.deleteRows(df, "test_table") // Upsert data kuduContext.upsertRows(df, "test_table") // Update data val updateDF = df.select($"key", ($"int_val" + 1).as("int_val")) kuduContext.updateRows(updateDF, "test_table") // Data can also be inserted into the Kudu table using the data source, though the methods on // KuduContext are preferred // NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert // in the options map // NB: Only mode Append is supported df.write .options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")) .mode("append") .format("kudu").save // Delete a Kudu table kuduContext.deleteTable("test_table")