Developing Applications With Apache Kudu
Apache Kudu provides C++ and Java client APIs, as well as reference examples to illustrate their use.
Viewing the API Documentation
C++ API Documentation
The documentation for the C++ client APIs is included in the header files in /usr/include/kudu/ if you installed Kudu using packages or subdirectories of src/kudu/client/ if you built Kudu from source. If you installed Kudu using parcels, no headers are included in your installation. and you will need to build Kudu from source in order to have access to the headers and shared libraries.
find /usr/include/kudu -type f -name *.h
Java API Documentation
View the Java API documentation online. Alternatively, after building the Java client, Java API documentation is available in java/kudu-client/target/apidocs/index.html.
Kudu Example Applications
- cpp/example.cc
-
A simple C++ application which connects to a Kudu instance, creates a table, writes data to it, then drops the table.
- java/java-example
-
A simple Java application which connects to a Kudu instance, creates a table, writes data to it, then drops the table.
- java/insert-loadgen
-
A small Java application which listens on a TCP socket for time series data corresponding to the Collectl wire protocol. The commonly-available collectl tool can be used to send example data to the server.
- python/dstat-kudu
-
A Java application that generates random insert load.
- python/graphite-kudu
-
An example program that shows how to use the Kudu Python API to load data into a new / existing Kudu table generated by an external program, dstat in this case.
- python/graphite-kudu
-
An example plugin for using graphite-web with Kudu as a backend.
These examples should serve as helpful starting points for your own Kudu applications and integrations.
Maven Artifacts
The following Maven <dependency> element is valid for the Apache Kudu GA release:
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.1.0</version> </dependency>
Convenience binary artifacts for the Java client and various Java integrations (e.g. Spark, Flume) are also now available via the ASF Maven repository and the Central Maven repository.
Building the Java Client
- JDK 7
- Apache Maven 3.x
- protoc 2.6 or newer installed in your path, or built from the thirdparty/ directory. Run the following commands to build
protoc from the third-party dependencies:
thirdparty/download-thirdparty.sh thirdparty/build-thirdparty.sh protobuf
To build the Java client, clone the Kudu Git repository, change to the java directory, and issue the following command:
mvn install -DskipTests
For more information about building the Java API, as well as Eclipse integration, see java/README.md.
Kudu Python Client
The Kudu Python client provides a Python friendly interface to the C++ client API.
To install and use Kudu Python client, you need to install the Kudu C++ client libraries and headers.
- Install Cython: sudo pip install cython
- Downloaded the Kudu Python client from kudu-python: kudu-python-1.2.0.tar.gz
- Install kudu-python: sudo pip install kudu-python
The sample below demonstrates the use of part of the Python client.
import kudu from kudu.client import Partitioning from datetime import datetime # Connect to Kudu master server client = kudu.connect(host='kudu.master', port=7051) # Define a schema for a new table builder = kudu.schema_builder() builder.add_column('key').type(kudu.int64).nullable(False).primary_key() builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4') schema = builder.build() # Define partitioning schema partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) # Create new table client.create_table('python-example', schema, partitioning) # Open a table table = client.table('python-example') # Create a new session so that we can apply write operations session = client.new_session() # Insert a row op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()}) session.apply(op) # Upsert a row op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"}) session.apply(op) # Updating a row op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")}) session.apply(op) # Delete a row op = table.new_delete({'key': 2}) session.apply(op) # Flush write operations, if failures occur, capture print them. try: session.flush() except kudu.KuduBadStatus as e: print(session.get_pending_errors()) # Create a scanner and add a predicate scanner = table.scanner() scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1)) # Open Scanner and read all tuples # Note: This doesn't scale for large scans result = scanner.open().read_all_tuples()
Example Apache Impala Commands With Kudu
See Using Apache Impala with Kudu for guidance on installing and using Impala with Kudu, including several impala-shell examples.
Kudu Integration with Spark
Kudu integrates with Spark through the Data Source API as of version 1.0.0. Include the kudu-spark dependency using the --packages option.
Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated with Kudu, version 1.5.0 is the latest to go to.
spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0
Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11. Spark 2 artifacts are available up to version 1.8.0.
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.8.0
Then import kudu-spark and create a dataframe as demonstrated in the following sample code. In the example, replace <kudu.master> with the actual hostname of the host running a Kudu master service, and <kudu_table> with the name of a pre-existing table in Kudu.
import org.apache.kudu.spark.kudu._ import org.apache.kudu.client._ import collection.JavaConverters._ // Read a table from Kudu val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table")).kudu // Query using the Spark API... df.select("id").filter("id >= 5").show() // ...or register a temporary table and use SQL df.registerTempTable("kudu_table") val filteredDF = spark.sql("select id from kudu_table where id >= 5").show() // Use KuduContext to create, delete, or write to Kudu tables val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext) // Create a new Kudu table from a dataframe schema // NB: No rows from the dataframe are inserted into the table kuduContext.createTable( "test_table", df.schema, Seq("key"), new CreateTableOptions() .setNumReplicas(1) .addHashPartitions(List("key").asJava, 3)) // Insert data kuduContext.insertRows(df, "test_table") // Delete data kuduContext.deleteRows(filteredDF, "test_table") // Upsert data kuduContext.upsertRows(df, "test_table") // Update data val alteredDF = df.select("id", $"count" + 1) kuduContext.updateRows(filteredRows, "test_table") // Data can also be inserted into the Kudu table using the data source, though the methods on // KuduContext are preferred // NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert // in the options map // NB: Only mode Append is supported df.write.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")).mode("append").kudu // Check for the existence of a Kudu table kuduContext.tableExists("another_table") // Delete a Kudu table kuduContext.deleteTable("unwanted_table")
Upsert Option in Kudu Spark
The upsert operation in kudu-spark supports an extra write option of ignoreNull. If set to true, it will avoid setting existing column values in Kudu table to Null if the corresponding dataframe column values are Null. If unspecified, ignoreNull is false by default.
val dataDF = spark.read.options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> simpleTableName)).kudu dataDF.registerTempTable(simpleTableName) dataDF.show() // Below is the original data in the table 'simpleTableName' +---+---+ |key|val| +---+---+ | 0|foo| +---+---+ // Upsert a row with existing key 0 and val Null with ignoreNull set to true val nullDF = spark.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val") val wo = new KuduWriteOptions wo.ignoreNull = true kuduContext.upsertRows(nullDF, simpleTableName, wo) dataDF.show() // The val field stays unchanged +---+---+ |key|val| +---+---+ | 0|foo| +---+---+ // Upsert a row with existing key 0 and val Null with ignoreNull default/set to false kuduContext.upsertRows(nullDF, simpleTableName) // Equivalent to: // val wo = new KuduWriteOptions // wo.ignoreNull = false // kuduContext.upsertRows(nullDF, simpleTableName, wo) df.show() // The val field is set to Null this time +---+----+ |key| val| +---+----+ | 0|null| +---+----+
Using Spark with a Secure Kudu Cluster
The Kudu-Spark integration is able to operate on secure Kudu clusters which have authentication and encryption enabled, but the submitter of the Spark job must provide the proper credentials. For Spark jobs using the default 'client' deploy mode, the submitting user must have an active Kerberos ticket granted through kinit. For Spark jobs using the 'cluster' deploy mode, a Kerberos principal name and keytab location must be provided through the --principal and --keytab arguments to spark2-submit.
Spark Integration Known Issues and Limitations
-
Spark 2.2 (and higher) requires Java 8 at runtime even though Kudu Spark 2.x integration is Java 7 compatible. Spark 2.2 is the default dependency version as of Kudu 1.5.0.
-
Kudu tables with a name containing upper case or non-ASCII characters must be assigned an alternate name when registered as a temporary table.
-
Kudu tables with a column name containing upper case or non-ASCII characters must not be used with SparkSQL. Columns can be renamed in Kudu to work around this issue.
-
<> and ORpredicates are not pushed to Kudu, and instead will be evaluated by the Spark task. Only LIKE predicates with a suffix wildcard are pushed to Kudu. This means LIKE "FOO%" will be pushed, but LIKE "FOO%BAR" won't.
-
Kudu does not support all the types supported by Spark SQL. For example, Date and complex types are not supported.
-
Kudu tables can only be registered as temporary tables in SparkSQL.
-
Kudu tables cannot be queried using HiveContext.
Spark Integration Best Practices
Avoid multiple Kudu clients per cluster
One common Kudu-Spark coding error is instantiating extra KuduClient objects. In kudu-spark, a KuduClient is owned by the KuduContext. Spark application code should not create another KuduClient connecting to the same cluster. Instead, application code should use the KuduContext to access a KuduClient using KuduContext#syncClient.
To diagnose multiple KuduClient instances in a Spark job, look for signs in the logs of the master being overloaded by many GetTableLocations or GetTabletLocations requests coming from different clients, usually around the same time. This symptom is especially likely in Spark Streaming code, where creating a KuduClient per task will result in periodic waves of master requests from new clients.
Integration with MapReduce, YARN, and Other Frameworks
Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in the Hadoop ecosystem. See RowCounter.java and ImportCsv.java for examples which you can model your own integrations on.