Calling Hive User-Defined Functions
You can call built-in Hive UDFs, UDAFs, and UDTFs and custom UDFs from Spark SQL applications if the functions are available in the standard Hive .jar file. When using Hive UDFs, use HiveContext (not SQLContext).
Using Built-in UDFs
The following interactive example reads and writes to HDFS under Hive directories, using
hiveContext
and the built-in collect_list(col)
UDF. The
collect_list(col)
UDF returns a list of objects with duplicates. In a
production environment, this type of operation runs under an account with appropriate HDFS
permissions; the following example uses hdfs
user.
Launch the Spark Shell on a YARN cluster:
su hdfs cd $SPARK_HOME ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
At the Scala REPL prompt, construct a HiveContext instance:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
Invoke the Hive
collect_list
UDF:scala> hiveContext.sql("from TestTable SELECT key, collect_list(value) group by key order by key").collect.foreach(println)
Using Custom UDFs
You can register custom functions in Python, Java, or Scala, and use them within SQL statements.
When using a custom UDF, ensure that the .jar file for your UDF is included with your
application, or use the --jars
command-line option to specify the file.
The following example uses a custom Hive UDF. This example uses the more limited SQLContext, instead of HiveContext.
Launch
spark-shell
withhive-udf.jar
as its parameter:./bin/spark-shell --jars <path-to-your-hive-udf>.jar
From
spark-shell
, define a function:scala> sqlContext.sql("""create temporary function balance as 'org.package.hiveudf.BalanceFromRechargesAndOrders'""");
From
spark-shell
, invoke your UDF:scala> sqlContext.sql(""" create table recharges_with_balance_array as select reseller_id, phone_number, phone_credit_id, date_recharge, phone_credit_value, balance(orders,'date_order', 'order_value', reseller_id, date_recharge, phone_credit_value) as balance from orders """);