Automating Spark Jobs with Oozie Spark Action

You can use Apache Spark as part of a complex workflow with multiple processing steps, triggers, and interdependencies. You can automate Apache Spark jobs using Oozie Spark action.

Spark3 (Gateway) must be installed on the node where the Oozie server is installed.

About Oozie Spark Action

If you use Apache Spark as part of a complex workflow with multiple processing steps, triggers, and interdependencies, consider using Apache Oozie to automate jobs. Oozie is a workflow engine that executes sequences of actions structured as directed acyclic graphs (DAGs). Each action is an individual unit of work, such as a Spark job or Hive query.

The Oozie "Spark action" runs a Spark job as part of an Oozie workflow. The workflow waits until the Spark job completes before continuing to the next action.

For additional information about Spark action, see the latest Oozie Spark3 Action Extension documentation, available on the Oozie UI. For general information about Oozie, see Overview of Oozie.

Configure Oozie Spark Action for Spark

  1. Set up .jar file exclusions.

    Oozie distributes its own libraries on the ShareLib, which are included on the classpath. These .jar files may conflict with each other if some components require different versions of a library. You can use the oozie.action.sharelib.for.<action_type>.exclude=<value> property to address these scenarios.

    Exclusion pattern works well for Spark3 actions, use spark3 as the action type. Jackson should not be an issue for Spark3 actions.

    Examples

    The following examples show how to use a ShareLib exclude on a Java action.

    Actual ShareLib content:

       * /user/oozie/share/lib/lib_20180701/oozie/lib-one-1.5.jar
       * /user/oozie/share/lib/lib_20180701/oozie/lib-two-1.5.jar
       * /user/oozie/share/lib/lib_20180701/java/lib-one-2.6.jar
       * /user/oozie/share/lib/lib_20180701/java/lib-two-2.6.jar
       * /user/oozie/share/lib/lib_20180701/java/component-connector.jar

    Setting the oozie.action.sharelib.for.java.exclude property to oozie/lib-one.*= results in the following distributed cache content:

       * /user/oozie/share/lib/lib_20180701/oozie/lib-two-1.5.jar
       * /user/oozie/share/lib/lib_20180701/java/lib-one-2.6.jar
       * /user/oozie/share/lib/lib_20180701/java/lib-two-2.6.jar
       * /user/oozie/share/lib/lib_20180701/java/component-connector.jar

    Setting the oozie.action.sharelib.for.java.exclude property to oozie/lib-one.*|component-connector.jar= results in the following distributed cache content:

       * /user/oozie/share/lib/lib_20180701/oozie/lib-two-1.5.jar
       * /user/oozie/share/lib/lib_20180701/java/lib-one-2.6.jar
       * /user/oozie/share/lib/lib_20180701/java/lib-two-2.6.jar
  2. Run the Oozie shareliblist command to verify the configuration. You should see spark in the results.
    oozie admin –shareliblist spark

The following examples show a workflow definition XML file, an Oozie job configuration file, and a Python script for running a Spark3-Pi job.

Sample Workflow.xml file for spark3-Pi:

<workflow-app xmlns='uri:oozie:workflow:1.0' name='SparkPythonPi'>
          <start to='spark-node' />
          
          <action name='spark-node'>
            <spark3 xmlns="uri:oozie:spark3-action:1.0">
              <resource-manager>${resourceManager}</resource-manager>
              <name-node>${nameNode}</name-node>
              <master>${master}</master>
              <mode>${mode}</mode>
              <name>Python-Spark-Pi</name>
              <jar>pi.py</jar>
            </spark3>
            <ok to="end" />
            <error to="fail" />
          </action>
          
          <kill name="fail">
            <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
          </kill>
          <end name='end' />
        </workflow-app>

Sample Job.properties file for spark3-Pi:

nameNode=hdfs://host:8020
resourceManager=host:8050
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/pyspark
master=yarn
mode=cluster

Sample Python script, lib/pi.py:

import sys
from random import random
from operator import add
from pyspark import SparkContext
        
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="Python-Spark-Pi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
        
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
        
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
        
sc.stop()

Troubleshooting .jar file conflicts with Oozie Spark action

When using Oozie Spark action, Oozie jobs may fail with an error similar to below if there are .jar file conflicts between the "oozie" ShareLib and the "spark3" ShareLib. In such case, you need to remove the conflicting JAR file.

2018-06-04 13:27:32,652 WARN SparkActionExecutor:523 - SERVER[XXXX] USER[XXXX] GROUP[-] TOKEN[] APP[XXXX] JOB[0000000-<XXXXX>-oozie-oozi-W] ACTION[0000000-<XXXXXX>-oozie-oozi-W@spark2] Launcher exception: Attempt to add (hdfs://XXXX/user/oozie/share/lib/lib_XXXXX/oozie/aws-java-sdk-kms-1.10.6.jar) multiple times to the distributed cache. 
java.lang.IllegalArgumentException: Attempt to add (hdfs://XXXXX/user/oozie/share/lib/lib_20170727191559/oozie/aws-java-sdk-kms-1.10.6.jar) multiple times to the distributed cache. 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13$anonfun$apply$8.apply(Client.scala:632) 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13$anonfun$apply$8.apply(Client.scala:623) 
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13.apply(Client.scala:623) 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13.apply(Client.scala:622) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:622) 
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895) 
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:171) 
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1231) 
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1290) 
at org.apache.spark.deploy.yarn.Client.main(Client.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:750) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:311) 
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:232) 
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:58) 
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:62) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:237) 
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) 
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) 
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164) 

Run the following commands to resolve this issue.

hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark3/aws* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark3/azure* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark3/hadoop-aws* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark3/hadoop-azure* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark3/ok*
hadoop fs -mv /user/oozie/share/lib/lib_<ts>/oozie/jackson* /user/oozie/share/lib/lib_<ts>/oozie.old 

Next, run the following command to update the Oozie ShareLib:

oozie admin -oozie http://<oozie-server-hostname>:11000/oozie -sharelibupdate