Spark indexing using morphlines
If you are using Apache Spark, you can batch index data using the CrunchIndexerTool.
CrunchIndexerTool is a Spark or MapReduce ETL batch job that pipes data from HDFS files into Apache Solr through a morphline for extraction and transformation. The program is designed for flexible, scalable, fault-tolerant batch ETL pipeline jobs. It is implemented as an Apache Crunch pipeline, allowing it to run on MapReduce or Spark execution engines.
CrunchIndexerTool requires a working MapReduce or Spark cluster, such as one installed using Cloudera Manager.
You can see the usage syntax CrunchIndexerTool
by
running the job with the -help
argument. Unlike other
Search indexing tools, the CrunchIndexerTool
jar does not
contain all dependencies. If you try to run the job without addressing
this, you get an error such as the following:
hadoop jar /opt/cloudera/parcels/CDH/lib/solr/contrib/crunch/search-crunch.jar org.apache.solr.crunch.CrunchIndexerTool -help
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/crunch/types/PType
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.util.RunJar.run(RunJar.java:214)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.crunch.types.PType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 4 more
To see the command usage (or to run the job), you must first add the dependencies to the classpath:
export HADOOP_CLASSPATH="/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch/*"
hadoop jar /opt/cloudera/parcels/CDH/lib/solr/contrib/crunch/search-crunch.jar org.apache.solr.crunch.CrunchIndexerTool -help
For reference, here is the command usage syntax:
MapReduceUsage: export HADOOP_CLASSPATH=$myDependencyJarPaths; hadoop jar $myDriverJar
org.apache.solr.crunch.CrunchIndexerTool --libjars $myDependencyJarFiles [MapReduceGenericOptions]...
[--input-file-list URI] [--input-file-format FQCN]
[--input-file-projection-schema FILE]
[--input-file-reader-schema FILE] --morphline-file FILE
[--morphline-id STRING] [--pipeline-type STRING] [--xhelp]
[--mappers INTEGER] [--parallel-morphline-inits INTEGER]
[--dry-run] [--log4j FILE] [--chatty] [HDFS_URI [HDFS_URI ...]]
SparkUsage: spark-submit [SparkGenericOptions]... --master local|yarn --deploy-mode client|cluster
--jars $myDependencyJarFiles --class org.apache.solr.crunch.CrunchIndexerTool $myDriverJar
[--input-file-list URI] [--input-file-format FQCN]
[--input-file-projection-schema FILE]
[--input-file-reader-schema FILE] --morphline-file FILE
[--morphline-id STRING] [--pipeline-type STRING] [--xhelp]
[--mappers INTEGER] [--parallel-morphline-inits INTEGER]
[--dry-run] [--log4j FILE] [--chatty] [HDFS_URI [HDFS_URI ...]]
Spark or MapReduce ETL batch job that pipes data from (splittable or non-
splittable) HDFS files into Apache Solr, and along the way runs the data
through a Morphline for extraction and transformation. The program is
designed for flexible, scalable and fault-tolerant batch ETL pipeline
jobs. It is implemented as an Apache Crunch pipeline and as such can run
on either the Apache Hadoop MapReduce or Apache Spark execution engine.
The program proceeds in several consecutive phases, as follows:
1) Randomization phase: This (parallel) phase randomizes the list of HDFS
input files in order to spread ingestion load more evenly among the mapper
tasks of the subsequent phase. This phase is only executed for non-
splittables files, and skipped otherwise.
2) Extraction phase: This (parallel) phase emits a series of HDFS file
input streams (for non-splittable files) or a series of input data records
(for splittable files).
3) Morphline phase: This (parallel) phase receives the items of the
previous phase, and uses a Morphline to extract the relevant content,
transform it and load zero or more documents into Solr. The ETL
functionality is flexible and customizable using chains of arbitrary
morphline commands that pipe records from one transformation command to
another. Commands to parse and transform a set of standard data formats
such as Avro, Parquet, CSV, Text, HTML, XML, PDF, MS-Office, etc. are
provided out of the box, and additional custom commands and parsers for
additional file or data formats can be added as custom morphline commands.
Any kind of data format can be processed and any kind output format can be
generated by any custom Morphline ETL logic. Also, this phase can be used
to send data directly to a live SolrCloud cluster (via the loadSolr
morphline command).
The program is implemented as a Crunch pipeline and as such Crunch
optimizes the logical phases mentioned above into an efficient physical
execution plan that runs a single mapper-only job, or as the corresponding
Spark equivalent.
Fault Tolerance: Task attempts are retried on failure per the standard
MapReduce or Spark semantics. If the whole job fails you can retry simply
by rerunning the program again using the same arguments.
Comparison with MapReduceIndexerTool:
1) CrunchIndexerTool can also run on the Spark execution engine, not just
on MapReduce.
2) CrunchIndexerTool enables interactive low latency prototyping, in
particular in Spark 'local' mode.
3) CrunchIndexerTool supports updates (and deletes) of existing documents
in Solr, not just inserts.
4) CrunchIndexerTool can exploit data locality for splittable Hadoop files
(text, avro, avroParquet).
We recommend MapReduceIndexerTool for large scale batch ingestion use
cases where updates (or deletes) of existing documents in Solr are not
required, and we recommend CrunchIndexerTool for all other use cases.
CrunchIndexerOptions:
HDFS_URI HDFS URI of file or directory tree to ingest.
(default: [])
--input-file-list URI, --input-list URI
Local URI or HDFS URI of a UTF-8 encoded file
containing a list of HDFS URIs to ingest, one URI
per line in the file. If '-' is specified, URIs
are read from the standard input. Multiple --
input-file-list arguments can be specified.
--input-file-format FQCN
The Hadoop FileInputFormat to use for extracting
data from splittable HDFS files. Can be a fully
qualified Java class name or one of ['text',
'avro', 'avroParquet']. If this option is present
the extraction phase will emit a series of input
data records rather than a series of HDFS file
input streams.
--input-file-projection-schema FILE
Relative or absolute path to an Avro schema file
on the local file system. This will be used as
the projection schema for Parquet input files.
--input-file-reader-schema FILE
Relative or absolute path to an Avro schema file
on the local file system. This will be used as
the reader schema for Avro or Parquet input
files. Example: src/test/resources/test-
documents/strings.avsc
--morphline-file FILE Relative or absolute path to a local config file
that contains one or more morphlines. The file
must be UTF-8 encoded. It will be uploaded to
each remote task. Example: /path/to/morphline.conf
--morphline-id STRING The identifier of the morphline that shall be
executed within the morphline config file
specified by --morphline-file. If the --morphline-
id option is omitted the first (i.e. top-most)
morphline within the config file is used.
Example: morphline1
--pipeline-type STRING
The engine to use for executing the job. Can be
'mapreduce' or 'spark'. (default: mapreduce)
--xhelp, --help, -help
Show this help message and exit
--mappers INTEGER Tuning knob that indicates the maximum number of
MR mapper tasks to use. -1 indicates use all map
slots available on the cluster. This parameter
only applies to non-splittable input files
(default: -1)
--parallel-morphline-inits INTEGER
Tuning knob that indicates the maximum number of
morphline instances to initialize at the same
time. This kind of rate limiting on rampup can be
useful to avoid overload conditions such as
ZooKeeper connection limits or DNS lookup limits
when using many parallel mapper tasks because
each such task contains one morphline. 1
indicates initialize each morphline separately.
This feature is implemented with a distributed
semaphore. The default is to use no rate limiting
(default: 2147483647)
--dry-run Run the pipeline but print documents to stdout
instead of loading them into Solr. This can be
used for quicker turnaround during early trial &
debug sessions. (default: false)
--log4j FILE Relative or absolute path to a log4j.properties
config file on the local file system. This file
will be uploaded to each remote task. Example:
/path/to/log4j.properties
--chatty Turn on verbose output. (default: false)
SparkGenericOptions: To print all options run 'spark-submit --help'
MapReduceGenericOptions: Generic options supported are
--conf <configuration file>
specify an application configuration file
-D <property=value> use value for given property
--fs <local|namenode:port>
specify a namenode
--jt <local|resourcemanager:port>
specify a ResourceManager
--files <comma separated list of files>
specify comma separated files to be copied to the
map reduce cluster
--libjars <comma separated list of jars>
specify comma separated jar files to include in
the classpath.
--archives <comma separated list of archives>
specify comma separated archives to be unarchived
on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
Examples:
# Prepare - Copy input files into HDFS:
export myResourcesDir=src/test/resources # for build from git
export myResourcesDir=/opt/cloudera/parcels/CDH/share/doc/search-*/search-crunch # for CDH with parcels
export myResourcesDir=/usr/share/doc/search-*/search-crunch # for CDH with packages
hadoop fs -copyFromLocal $myResourcesDir/test-documents/hello1.txt hdfs:/user/systest/input/
# Prepare variables for convenient reuse:
export myDriverJarDir=target # for build from git
export myDriverJarDir=/opt/cloudera/parcels/CDH/lib/solr/contrib/crunch # for CDH with parcels
export myDriverJarDir=/usr/lib/solr/contrib/crunch # for CDH with packages
export myDependencyJarDir=target/lib # for build from git
export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch # for CDH with parcels
export myDependencyJarDir=/usr/lib/search/lib/search-crunch # for CDH with packages
export myDriverJar=$(find $myDriverJarDir -maxdepth 1 -name 'search-crunch-*.jar' ! -name '*-job.jar' ! -name '*-sources.jar')
export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1)
export myDependencyJarPaths=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ':' | head -c -1)
export myJVMOptions="-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/" # connection settings for solrj, also custom tmp dir
# MapReduce on Yarn - Ingest text file line by line into Solr:
export HADOOP_CLIENT_OPTS="$myJVMOptions"; export HADOOP_CLASSPATH=$myDependencyJarPaths; hadoop \
--config /etc/hadoop/conf.cloudera.YARN-1 \
jar $myDriverJar org.apache.solr.crunch.CrunchIndexerTool \
--libjars $myDependencyJarFiles \
-D mapreduce.map.java.opts="-Xmx500m $myJVMOptions" \
-D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
--files $myResourcesDir/test-documents/string.avsc \
--morphline-file $myResourcesDir/test-morphlines/loadSolrLine.conf \
--pipeline-type mapreduce \
--chatty \
--log4j $myResourcesDir/log4j.properties \
/user/systest/input/hello1.txt
# Spark in Local Mode (for rapid prototyping) - Ingest into Solr:
spark-submit \
--master local \
--deploy-mode client \
--jars $myDependencyJarFiles \
--executor-memory 500M \
--conf "spark.executor.extraJavaOptions=$myJVMOptions" \
--driver-java-options "$myJVMOptions" \
# --driver-library-path /opt/cloudera/parcels/CDH/lib/hadoop/lib/native # for Snappy on CDH with parcels\
# --driver-library-path /usr/lib/hadoop/lib/native # for Snappy on CDH with packages \
--class org.apache.solr.crunch.CrunchIndexerTool \
$myDriverJar \
-D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
--morphline-file $myResourcesDir/test-morphlines/loadSolrLine.conf \
--pipeline-type spark \
--chatty \
--log4j $myResourcesDir/log4j.properties \
/user/systest/input/hello1.txt
# Spark on Yarn in Client Mode (for testing) - Ingest into Solr:
Same as above, except replace '--master local' with '--master yarn'
# View the yarn executor log files (there is no GUI yet):
yarn logs --applicationId $application_XYZ
# Spark on Yarn in Cluster Mode (for production) - Ingest into Solr:
spark-submit \
--master yarn \
--deploy-mode cluster \
--jars $myDependencyJarFiles \
--executor-memory 500M \
--conf "spark.executor.extraJavaOptions=$myJVMOptions" \
--driver-java-options "$myJVMOptions" \
--class org.apache.solr.crunch.CrunchIndexerTool \
--files $(ls $myResourcesDir/log4j.properties),$(ls $myResourcesDir/test-morphlines/loadSolrLine.conf)\
$myDriverJar \
-D hadoop.tmp.dir=/tmp \
-D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
--morphline-file loadSolrLine.conf \
--pipeline-type spark \
--chatty \
--log4j log4j.properties \
/user/systest/input/hello1.txt
# Spark on Yarn in Cluster Mode (for production) - Ingest into Secure (Kerberos-enabled) Solr:
# Spark requires two additional steps compared to non-secure solr:
# (NOTE: MapReduce does not require extra steps for communicating with kerberos-enabled Solr)
# 1) Create a delegation token file
# a) kinit as the user who will make solr requests
# b) request a delegation token from solr and save it to a file:
# e.g. using curl:
# "curl --negotiate -u: http://solr-host:port/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt"
# 2) Pass the delegation token file to spark-submit:
# a) add the delegation token file via --files
# b) pass the file name via -D tokenFile
# spark places this file in the cwd of the executor, so only list the file name, no path
spark-submit \
--master yarn \
--deploy-mode cluster \
--jars $myDependencyJarFiles \
--executor-memory 500M \
--conf "spark.executor.extraJavaOptions=$myJVMOptions" \
--driver-java-options "$myJVMOptions" \
--class org.apache.solr.crunch.CrunchIndexerTool \
--files $(ls $myResourcesDir/log4j.properties),$(ls $myResourcesDir/test-morphlines/loadSolrLine.conf),tokenFile.txt\
$myDriverJar \
-D hadoop.tmp.dir=/tmp \
-D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
-DtokenFile=tokenFile.txt \
--morphline-file loadSolrLine.conf \
--pipeline-type spark \
--chatty \
--log4j log4j.properties \
/user/systest/input/hello1.txt