Spark indexing using morphlines

If you are using Apache Spark, you can batch index data using the CrunchIndexerTool.

CrunchIndexerTool is a Spark or MapReduce ETL batch job that pipes data from HDFS files into Apache Solr through a morphline for extraction and transformation. The program is designed for flexible, scalable, fault-tolerant batch ETL pipeline jobs. It is implemented as an Apache Crunch pipeline, allowing it to run on MapReduce or Spark execution engines. The tool supports both Spark 2 and Spark 3. Spark 2 support will be discontinued in a future release.

CrunchIndexerTool requires a working MapReduce or Spark cluster, such as one installed using Cloudera Manager.

You can see the usage syntax CrunchIndexerTool by running the job with the -help argument. Unlike other Search indexing tools, the CrunchIndexerTool jar does not contain all dependencies. If you try to run the job without addressing this, you get an error such as the following:

hadoop jar /opt/cloudera/parcels/CDH/lib/solr/contrib/crunch/search-crunch.jar org.apache.solr.crunch.CrunchIndexerTool -help
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/crunch/types/PType
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:214)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.crunch.types.PType
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 4 more

To see the command usage (or to run the job), you must first add the dependencies to the classpath:

export HADOOP_CLASSPATH="/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch/*"
hadoop jar /opt/cloudera/parcels/CDH/lib/solr/contrib/crunch/search-crunch.jar org.apache.solr.crunch.CrunchIndexerTool -help

For reference, here is the command usage syntax:

MapReduceUsage: export HADOOP_CLASSPATH=$myDependencyJarPaths; hadoop jar $myDriverJar
org.apache.solr.crunch.CrunchIndexerTool --libjars $myDependencyJarFiles [MapReduceGenericOptions]...
        [--input-file-list URI] [--input-file-format FQCN]
        [--input-file-projection-schema FILE]
        [--input-file-reader-schema FILE] --morphline-file FILE
        [--morphline-id STRING] [--pipeline-type STRING] [--xhelp]
        [--mappers INTEGER] [--parallel-morphline-inits INTEGER]
        [--dry-run] [--log4j FILE] [--chatty] [HDFS_URI [HDFS_URI ...]]

SparkUsage: spark-submit [SparkGenericOptions]... --master local|yarn --deploy-mode client|cluster
--jars $myDependencyJarFiles --class org.apache.solr.crunch.CrunchIndexerTool $myDriverJar
        [--input-file-list URI] [--input-file-format FQCN]
        [--input-file-projection-schema FILE]
        [--input-file-reader-schema FILE] --morphline-file FILE
        [--morphline-id STRING] [--pipeline-type STRING] [--xhelp]
        [--mappers INTEGER] [--parallel-morphline-inits INTEGER]
        [--dry-run] [--log4j FILE] [--chatty] [HDFS_URI [HDFS_URI ...]]

Spark or MapReduce ETL batch job  that  pipes data from (splittable or non-
splittable) HDFS files into Apache Solr,  and  along  the way runs the data
through a Morphline  for  extraction  and  transformation.  The  program is
designed for  flexible,  scalable  and  fault-tolerant  batch  ETL pipeline
jobs. It is implemented as an  Apache  Crunch  pipeline and as such can run
on either the Apache Hadoop MapReduce or Apache Spark execution engine.

The program proceeds in several consecutive phases, as follows:

1) Randomization phase: This (parallel)  phase  randomizes the list of HDFS
input files in order to spread ingestion  load more evenly among the mapper
tasks of the  subsequent  phase.  This  phase  is  only  executed  for non-
splittables files, and skipped otherwise.

2) Extraction phase: This  (parallel)  phase  emits  a  series of HDFS file
input streams (for non-splittable files) or  a series of input data records
(for splittable files).

3) Morphline  phase:  This  (parallel)  phase  receives  the  items  of the
previous phase, and  uses  a  Morphline  to  extract  the relevant content,
transform  it  and  load  zero  or   more  documents  into  Solr.  The  ETL
functionality is  flexible  and  customizable  using  chains  of  arbitrary
morphline commands that pipe  records  from  one  transformation command to
another. Commands to parse and  transform  a  set  of standard data formats
such as Avro, Parquet,  CSV,  Text,  HTML,  XML,  PDF,  MS-Office, etc. are
provided out of the box,  and  additional  custom  commands and parsers for
additional file or data formats can  be added as custom morphline commands.
Any kind of data format can be processed  and any kind output format can be
generated by any custom Morphline ETL  logic.  Also, this phase can be used
to send data  directly  to  a  live  SolrCloud  cluster  (via  the loadSolr
morphline command).

The program  is  implemented  as  a  Crunch  pipeline  and  as  such Crunch
optimizes the logical phases  mentioned  above  into  an efficient physical
execution plan that runs a single  mapper-only job, or as the corresponding
Spark equivalent.

Fault Tolerance: Task attempts  are  retried  on  failure  per the standard
MapReduce or Spark semantics. If the  whole  job fails you can retry simply
by rerunning the program again using the same arguments.

Comparison with MapReduceIndexerTool:

1) CrunchIndexerTool can also run on  the  Spark execution engine, not just
on MapReduce.
2)  CrunchIndexerTool  enables  interactive  low  latency  prototyping,  in
particular in Spark 'local' mode.
3) CrunchIndexerTool supports updates  (and  deletes) of existing documents
in Solr, not just inserts.
4) CrunchIndexerTool can exploit data  locality for splittable Hadoop files
(text, avro, avroParquet).
We recommend  MapReduceIndexerTool  for  large  scale  batch  ingestion use
cases where updates (or  deletes)  of  existing  documents  in Solr are not
required, and we recommend CrunchIndexerTool for all other use cases.

CrunchIndexerOptions:
  HDFS_URI               HDFS URI of  file  or  directory  tree  to ingest.
                         (default: [])
  --input-file-list URI, --input-list URI
                         Local URI or  HDFS  URI  of  a  UTF-8 encoded file
                         containing a list of HDFS  URIs to ingest, one URI
                         per line in the  file.  If  '-' is specified, URIs
                         are read  from  the  standard  input.  Multiple --
                         input-file-list arguments can be specified.
  --input-file-format FQCN
                         The Hadoop FileInputFormat  to  use for extracting
                         data from splittable HDFS  files.  Can  be a fully
                         qualified Java  class  name  or  one  of  ['text',
                         'avro', 'avroParquet']. If this  option is present
                         the extraction phase will  emit  a series of input
                         data records rather  than  a  series  of HDFS file
                         input streams.
  --input-file-projection-schema FILE
                         Relative or absolute path  to  an Avro schema file
                         on the local file  system.  This  will  be used as
                         the projection schema for Parquet input files.
  --input-file-reader-schema FILE
                         Relative or absolute path  to  an Avro schema file
                         on the local file  system.  This  will  be used as
                         the  reader  schema  for  Avro  or  Parquet  input
                         files.      Example:      src/test/resources/test-
                         documents/strings.avsc
  --morphline-file FILE  Relative or absolute path  to  a local config file
                         that contains one  or  more  morphlines.  The file
                         must be UTF-8  encoded.  It  will  be  uploaded to
                         each remote task. Example: /path/to/morphline.conf
  --morphline-id STRING  The identifier  of  the  morphline  that  shall be
                         executed  within   the   morphline   config   file
                         specified by --morphline-file. If the --morphline-
                         id option is  omitted  the  first  (i.e. top-most)
                         morphline  within  the   config   file   is  used.
                         Example: morphline1
  --pipeline-type STRING
                         The engine to use  for  executing  the job. Can be
                         'mapreduce' or 'spark'. (default: mapreduce)
  --xhelp, --help, -help
                         Show this help message and exit
  --mappers INTEGER      Tuning knob that indicates  the  maximum number of
                         MR mapper tasks to use.  -1  indicates use all map
                         slots available  on  the  cluster.  This parameter
                         only  applies   to   non-splittable   input  files
                         (default: -1)
  --parallel-morphline-inits INTEGER
                         Tuning knob that indicates  the  maximum number of
                         morphline instances  to  initialize  at  the  same
                         time. This kind of rate  limiting on rampup can be
                         useful  to  avoid  overload   conditions  such  as
                         ZooKeeper connection limits  or  DNS lookup limits
                         when using  many  parallel  mapper  tasks  because
                         each  such   task   contains   one   morphline.  1
                         indicates initialize  each  morphline  separately.
                         This feature  is  implemented  with  a distributed
                         semaphore. The default is to  use no rate limiting
                         (default: 2147483647)
  --dry-run              Run the pipeline  but  print  documents  to stdout
                         instead of loading  them  into  Solr.  This can be
                         used for quicker turnaround  during  early trial &
                         debug sessions. (default: false)
  --log4j FILE           Relative or absolute  path  to  a log4j.properties
                         config file on the  local  file  system. This file
                         will be uploaded  to  each  remote  task. Example:
                         /path/to/log4j.properties
  --chatty               Turn on verbose output. (default: false)

SparkGenericOptions:     To print all options run 'spark-submit --help'

MapReduceGenericOptions: Generic options supported are
  --conf <configuration file>
                         specify an application configuration file
  -D <property=value>    use value for given property
  --fs <local|namenode:port>
                         specify a namenode
  --jt <local|resourcemanager:port>
                         specify a ResourceManager
  --files <comma separated list of files>
                         specify comma separated files to  be copied to the
                         map reduce cluster
  --libjars <comma separated list of jars>
                         specify comma separated  jar  files  to include in
                         the classpath.
  --archives <comma separated list of archives>
                         specify comma separated archives  to be unarchived
                         on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]


Examples:

# Prepare - Copy input files into HDFS:
export myResourcesDir=src/test/resources # for build from git
export myResourcesDir=/opt/cloudera/parcels/CDH/share/doc/search-*/search-crunch # for CDH with parcels
export myResourcesDir=/usr/share/doc/search-*/search-crunch # for CDH with packages
hadoop fs -copyFromLocal $myResourcesDir/test-documents/hello1.txt hdfs:/user/systest/input/

# Prepare variables for convenient reuse:
export myDriverJarDir=target # for build from git
export myDriverJarDir=/opt/cloudera/parcels/CDH/lib/solr/contrib/crunch # for CDH with parcels
export myDriverJarDir=/usr/lib/solr/contrib/crunch # for CDH with packages
export myDependencyJarDir=target/lib # for build from git
export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch # for CDH with parcels
export myDependencyJarDir=/usr/lib/search/lib/search-crunch # for CDH with packages
export myDriverJar=$(find $myDriverJarDir -maxdepth 1 -name 'search-crunch-*.jar' ! -name '*-job.jar' ! -name '*-sources.jar')
export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1)
export myDependencyJarPaths=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ':' | head -c -1)
export myJVMOptions="-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/" # connection settings for solrj, also custom tmp dir

# MapReduce on Yarn - Ingest text file line by line into Solr:
export HADOOP_CLIENT_OPTS="$myJVMOptions"; export HADOOP_CLASSPATH=$myDependencyJarPaths; hadoop \
  --config /etc/hadoop/conf.cloudera.YARN-1 \
  jar $myDriverJar org.apache.solr.crunch.CrunchIndexerTool \
  --libjars $myDependencyJarFiles \
  -D mapreduce.map.java.opts="-Xmx500m $myJVMOptions" \
  -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
  --files $myResourcesDir/test-documents/string.avsc \
  --morphline-file $myResourcesDir/test-morphlines/loadSolrLine.conf \
  --pipeline-type mapreduce \
  --chatty \
  --log4j $myResourcesDir/log4j.properties \
  /user/systest/input/hello1.txt

# Spark in Local Mode (for rapid prototyping) - Ingest into Solr:
spark-submit \
  --master local \
  --deploy-mode client \
  --jars $myDependencyJarFiles \
  --executor-memory 500M \
  --conf "spark.executor.extraJavaOptions=$myJVMOptions" \
  --driver-java-options "$myJVMOptions" \
  # --driver-library-path /opt/cloudera/parcels/CDH/lib/hadoop/lib/native # for Snappy on CDH with parcels\
  # --driver-library-path /usr/lib/hadoop/lib/native # for Snappy on CDH with packages \
  --class org.apache.solr.crunch.CrunchIndexerTool \
  $myDriverJar \
  -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
  --morphline-file $myResourcesDir/test-morphlines/loadSolrLine.conf \
  --pipeline-type spark \
  --chatty \
  --log4j $myResourcesDir/log4j.properties \
  /user/systest/input/hello1.txt

# Spark on Yarn in Client Mode (for testing) - Ingest into Solr:
Same as above, except replace '--master local' with '--master yarn'

# View the yarn executor log files (there is no GUI yet):
yarn logs --applicationId $application_XYZ

# Spark on Yarn in Cluster Mode (for production) - Ingest into Solr:
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --jars $myDependencyJarFiles \
  --executor-memory 500M \
  --conf "spark.executor.extraJavaOptions=$myJVMOptions" \
  --driver-java-options "$myJVMOptions" \
  --class org.apache.solr.crunch.CrunchIndexerTool \
  --files $(ls $myResourcesDir/log4j.properties),$(ls $myResourcesDir/test-morphlines/loadSolrLine.conf)\
  $myDriverJar \
  -D hadoop.tmp.dir=/tmp \
  -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
  --morphline-file loadSolrLine.conf \
  --pipeline-type spark \
  --chatty \
  --log4j log4j.properties \
  /user/systest/input/hello1.txt

# Spark on Yarn in Cluster Mode (for production) - Ingest into Secure (Kerberos-enabled) Solr:
# Spark requires two additional steps compared to non-secure solr:
# (NOTE: MapReduce does not require extra steps for communicating with kerberos-enabled Solr)
# 1) Create a delegation token file
#    a) kinit as the user who will make solr requests
#    b) request a delegation token from solr and save it to a file:
#       e.g. using curl:
#       "curl --negotiate -u: http://solr-host:port/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt"
# 2) Pass the delegation token file to spark-submit:
#    a) add the delegation token file via --files
#    b) pass the file name via -D tokenFile
#       spark places this file in the cwd of the executor, so only list the file name, no path
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --jars $myDependencyJarFiles \
  --executor-memory 500M \
  --conf "spark.executor.extraJavaOptions=$myJVMOptions" \
  --driver-java-options "$myJVMOptions" \
  --class org.apache.solr.crunch.CrunchIndexerTool \
  --files $(ls $myResourcesDir/log4j.properties),$(ls $myResourcesDir/test-morphlines/loadSolrLine.conf),tokenFile.txt\
  $myDriverJar \
  -D hadoop.tmp.dir=/tmp \
  -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
  -DtokenFile=tokenFile.txt \
  --morphline-file loadSolrLine.conf \
  --pipeline-type spark \
  --chatty \
  --log4j log4j.properties \
  /user/systest/input/hello1.txt