Calling Hive User-Defined Functions
You can call built-in Hive UDFs, UDAFs, and UDTFs from Spark SQL applications, as long as the functions are available in the standard Hive .jar file. When using Hive UDFs, use HiveContext (not SQLContext).
The following interactive example reads and writes to HDFS under Hive directories, using
hiveContext
and the built-in collect_list(col)
UDF. The
collect_list(col)
UDF returns a list of objects with duplicates. In a
production environment this type of operation would run under an account with appropriate HDFS
permissions; the following example uses hdfs
user.
Launch the Spark Shell on a YARN cluster:
su hdfs
cd $SPARK_HOME
./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
At the Scala REPL prompt, construct a HiveContext:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
Invoke the Hive
collect_list
UDF:scala> hiveContext.sql("from TestTable SELECT key, collect_list(value) group by key order by key").collect.foreach(println)
Using Custom UDFs
You can register custom functions in Python, Java, or Scala, and use them within SQL statements.
When using a custom UDF, make sure that the jar file for your UDF is included with your
application, or use the --jars
command-line option to specify the file.
The following example uses a custom Hive UDF. This example uses the more limited SQLContext, instead of HiveContext.
Launch spark-shell with
hive-udf.jar
as its parameter:./bin/spark-shell --jars <path-to-your-hive-udf>.jar
From spark-shell, define a function:
scala> sqlContext.sql("""create temporary function balance as 'org.package.hiveudf.BalanceFromRechargesAndOrders'""");
From spark-shell, invoke your UDF:
scala> sqlContext.sql(""" create table recharges_with_balance_array as select reseller_id, phone_number, phone_credit_id, date_recharge, phone_credit_value, balance(orders,'date_order', 'order_value', reseller_id, date_recharge, phone_credit_value) as balance from orders """);