Spark Indexing
If you are using Apache Spark, you can batch index data using CrunchIndexerTool.
CrunchIndexerTool is a Spark or MapReduce ETL batch job that pipes data from HDFS files into Apache Solr through a morphline for extraction and transformation. The program is designed for flexible, scalable, fault-tolerant batch ETL pipeline jobs. It is implemented as an Apache Crunch pipeline, allowing it to run on MapReduce or Spark execution engines.
CrunchIndexerTool requires a working MapReduce or Spark cluster, such as one installed using Cloudera Manager.
You can see the usage syntax CrunchIndexerTool
by
running the job with the -help
argument. Unlike other
Search indexing tools, the CrunchIndexerTool
jar does not
contain all dependencies. If you try to run the job without addressing
this, you get an error such as the following:
hadoop jar /opt/cloudera/parcels/CDH/lib/solr/contrib/crunch/search-crunch.jar org.apache.solr.crunch.CrunchIndexerTool -help Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/crunch/types/PType at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.util.RunJar.run(RunJar.java:214) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) Caused by: java.lang.ClassNotFoundException: org.apache.crunch.types.PType at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 4 more
To see the command usage (or to run the job), you must first add the dependencies to the classpath:
export HADOOP_CLASSPATH="/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch/*" hadoop jar /opt/cloudera/parcels/CDH/lib/solr/contrib/crunch/search-crunch.jar org.apache.solr.crunch.CrunchIndexerTool -help
For reference, here is the command usage syntax:
MapReduceUsage: export HADOOP_CLASSPATH=$myDependencyJarPaths; hadoop jar $myDriverJar org.apache.solr.crunch.CrunchIndexerTool --libjars $myDependencyJarFiles [MapReduceGenericOptions]... [--input-file-list URI] [--input-file-format FQCN] [--input-file-projection-schema FILE] [--input-file-reader-schema FILE] --morphline-file FILE [--morphline-id STRING] [--pipeline-type STRING] [--xhelp] [--mappers INTEGER] [--parallel-morphline-inits INTEGER] [--dry-run] [--log4j FILE] [--chatty] [HDFS_URI [HDFS_URI ...]] SparkUsage: spark-submit [SparkGenericOptions]... --master local|yarn --deploy-mode client|cluster --jars $myDependencyJarFiles --class org.apache.solr.crunch.CrunchIndexerTool $myDriverJar [--input-file-list URI] [--input-file-format FQCN] [--input-file-projection-schema FILE] [--input-file-reader-schema FILE] --morphline-file FILE [--morphline-id STRING] [--pipeline-type STRING] [--xhelp] [--mappers INTEGER] [--parallel-morphline-inits INTEGER] [--dry-run] [--log4j FILE] [--chatty] [HDFS_URI [HDFS_URI ...]] Spark or MapReduce ETL batch job that pipes data from (splittable or non- splittable) HDFS files into Apache Solr, and along the way runs the data through a Morphline for extraction and transformation. The program is designed for flexible, scalable and fault-tolerant batch ETL pipeline jobs. It is implemented as an Apache Crunch pipeline and as such can run on either the Apache Hadoop MapReduce or Apache Spark execution engine. The program proceeds in several consecutive phases, as follows: 1) Randomization phase: This (parallel) phase randomizes the list of HDFS input files in order to spread ingestion load more evenly among the mapper tasks of the subsequent phase. This phase is only executed for non- splittables files, and skipped otherwise. 2) Extraction phase: This (parallel) phase emits a series of HDFS file input streams (for non-splittable files) or a series of input data records (for splittable files). 3) Morphline phase: This (parallel) phase receives the items of the previous phase, and uses a Morphline to extract the relevant content, transform it and load zero or more documents into Solr. The ETL functionality is flexible and customizable using chains of arbitrary morphline commands that pipe records from one transformation command to another. Commands to parse and transform a set of standard data formats such as Avro, Parquet, CSV, Text, HTML, XML, PDF, MS-Office, etc. are provided out of the box, and additional custom commands and parsers for additional file or data formats can be added as custom morphline commands. Any kind of data format can be processed and any kind output format can be generated by any custom Morphline ETL logic. Also, this phase can be used to send data directly to a live SolrCloud cluster (via the loadSolr morphline command). The program is implemented as a Crunch pipeline and as such Crunch optimizes the logical phases mentioned above into an efficient physical execution plan that runs a single mapper-only job, or as the corresponding Spark equivalent. Fault Tolerance: Task attempts are retried on failure per the standard MapReduce or Spark semantics. If the whole job fails you can retry simply by rerunning the program again using the same arguments. Comparison with MapReduceIndexerTool: 1) CrunchIndexerTool can also run on the Spark execution engine, not just on MapReduce. 2) CrunchIndexerTool enables interactive low latency prototyping, in particular in Spark 'local' mode. 3) CrunchIndexerTool supports updates (and deletes) of existing documents in Solr, not just inserts. 4) CrunchIndexerTool can exploit data locality for splittable Hadoop files (text, avro, avroParquet). We recommend MapReduceIndexerTool for large scale batch ingestion use cases where updates (or deletes) of existing documents in Solr are not required, and we recommend CrunchIndexerTool for all other use cases. CrunchIndexerOptions: HDFS_URI HDFS URI of file or directory tree to ingest. (default: []) --input-file-list URI, --input-list URI Local URI or HDFS URI of a UTF-8 encoded file containing a list of HDFS URIs to ingest, one URI per line in the file. If '-' is specified, URIs are read from the standard input. Multiple -- input-file-list arguments can be specified. --input-file-format FQCN The Hadoop FileInputFormat to use for extracting data from splittable HDFS files. Can be a fully qualified Java class name or one of ['text', 'avro', 'avroParquet']. If this option is present the extraction phase will emit a series of input data records rather than a series of HDFS file input streams. --input-file-projection-schema FILE Relative or absolute path to an Avro schema file on the local file system. This will be used as the projection schema for Parquet input files. --input-file-reader-schema FILE Relative or absolute path to an Avro schema file on the local file system. This will be used as the reader schema for Avro or Parquet input files. Example: src/test/resources/test- documents/strings.avsc --morphline-file FILE Relative or absolute path to a local config file that contains one or more morphlines. The file must be UTF-8 encoded. It will be uploaded to each remote task. Example: /path/to/morphline.conf --morphline-id STRING The identifier of the morphline that shall be executed within the morphline config file specified by --morphline-file. If the --morphline- id option is omitted the first (i.e. top-most) morphline within the config file is used. Example: morphline1 --pipeline-type STRING The engine to use for executing the job. Can be 'mapreduce' or 'spark'. (default: mapreduce) --xhelp, --help, -help Show this help message and exit --mappers INTEGER Tuning knob that indicates the maximum number of MR mapper tasks to use. -1 indicates use all map slots available on the cluster. This parameter only applies to non-splittable input files (default: -1) --parallel-morphline-inits INTEGER Tuning knob that indicates the maximum number of morphline instances to initialize at the same time. This kind of rate limiting on rampup can be useful to avoid overload conditions such as ZooKeeper connection limits or DNS lookup limits when using many parallel mapper tasks because each such task contains one morphline. 1 indicates initialize each morphline separately. This feature is implemented with a distributed semaphore. The default is to use no rate limiting (default: 2147483647) --dry-run Run the pipeline but print documents to stdout instead of loading them into Solr. This can be used for quicker turnaround during early trial & debug sessions. (default: false) --log4j FILE Relative or absolute path to a log4j.properties config file on the local file system. This file will be uploaded to each remote task. Example: /path/to/log4j.properties --chatty Turn on verbose output. (default: false) SparkGenericOptions: To print all options run 'spark-submit --help' MapReduceGenericOptions: Generic options supported are --conf <configuration file> specify an application configuration file -D <property=value> use value for given property --fs <local|namenode:port> specify a namenode --jt <local|resourcemanager:port> specify a ResourceManager --files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster --libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. --archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. The general command line syntax is bin/hadoop command [genericOptions] [commandOptions] Examples: # Prepare - Copy input files into HDFS: export myResourcesDir=src/test/resources # for build from git export myResourcesDir=/opt/cloudera/parcels/CDH/share/doc/search-*/search-crunch # for CDH with parcels export myResourcesDir=/usr/share/doc/search-*/search-crunch # for CDH with packages hadoop fs -copyFromLocal $myResourcesDir/test-documents/hello1.txt hdfs:/user/systest/input/ # Prepare variables for convenient reuse: export myDriverJarDir=target # for build from git export myDriverJarDir=/opt/cloudera/parcels/CDH/lib/solr/contrib/crunch # for CDH with parcels export myDriverJarDir=/usr/lib/solr/contrib/crunch # for CDH with packages export myDependencyJarDir=target/lib # for build from git export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch # for CDH with parcels export myDependencyJarDir=/usr/lib/search/lib/search-crunch # for CDH with packages export myDriverJar=$(find $myDriverJarDir -maxdepth 1 -name 'search-crunch-*.jar' ! -name '*-job.jar' ! -name '*-sources.jar') export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1) export myDependencyJarPaths=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ':' | head -c -1) export myJVMOptions="-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/" # connection settings for solrj, also custom tmp dir # MapReduce on Yarn - Ingest text file line by line into Solr: export HADOOP_CLIENT_OPTS="$myJVMOptions"; export HADOOP_CLASSPATH=$myDependencyJarPaths; hadoop \ --config /etc/hadoop/conf.cloudera.YARN-1 \ jar $myDriverJar org.apache.solr.crunch.CrunchIndexerTool \ --libjars $myDependencyJarFiles \ -D mapreduce.map.java.opts="-Xmx500m $myJVMOptions" \ -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \ --files $myResourcesDir/test-documents/string.avsc \ --morphline-file $myResourcesDir/test-morphlines/loadSolrLine.conf \ --pipeline-type mapreduce \ --chatty \ --log4j $myResourcesDir/log4j.properties \ /user/systest/input/hello1.txt # Spark in Local Mode (for rapid prototyping) - Ingest into Solr: spark-submit \ --master local \ --deploy-mode client \ --jars $myDependencyJarFiles \ --executor-memory 500M \ --conf "spark.executor.extraJavaOptions=$myJVMOptions" \ --driver-java-options "$myJVMOptions" \ # --driver-library-path /opt/cloudera/parcels/CDH/lib/hadoop/lib/native # for Snappy on CDH with parcels\ # --driver-library-path /usr/lib/hadoop/lib/native # for Snappy on CDH with packages \ --class org.apache.solr.crunch.CrunchIndexerTool \ $myDriverJar \ -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \ --morphline-file $myResourcesDir/test-morphlines/loadSolrLine.conf \ --pipeline-type spark \ --chatty \ --log4j $myResourcesDir/log4j.properties \ /user/systest/input/hello1.txt # Spark on Yarn in Client Mode (for testing) - Ingest into Solr: Same as above, except replace '--master local' with '--master yarn' # View the yarn executor log files (there is no GUI yet): yarn logs --applicationId $application_XYZ # Spark on Yarn in Cluster Mode (for production) - Ingest into Solr: spark-submit \ --master yarn \ --deploy-mode cluster \ --jars $myDependencyJarFiles \ --executor-memory 500M \ --conf "spark.executor.extraJavaOptions=$myJVMOptions" \ --driver-java-options "$myJVMOptions" \ --class org.apache.solr.crunch.CrunchIndexerTool \ --files $(ls $myResourcesDir/log4j.properties),$(ls $myResourcesDir/test-morphlines/loadSolrLine.conf)\ $myDriverJar \ -D hadoop.tmp.dir=/tmp \ -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \ --morphline-file loadSolrLine.conf \ --pipeline-type spark \ --chatty \ --log4j log4j.properties \ /user/systest/input/hello1.txt # Spark on Yarn in Cluster Mode (for production) - Ingest into Secure (Kerberos-enabled) Solr: # Spark requires two additional steps compared to non-secure solr: # (NOTE: MapReduce does not require extra steps for communicating with kerberos-enabled Solr) # 1) Create a delegation token file # a) kinit as the user who will make solr requests # b) request a delegation token from solr and save it to a file: # e.g. using curl: # "curl --negotiate -u: http://solr-host:port/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt" # 2) Pass the delegation token file to spark-submit: # a) add the delegation token file via --files # b) pass the file name via -D tokenFile # spark places this file in the cwd of the executor, so only list the file name, no path spark-submit \ --master yarn \ --deploy-mode cluster \ --jars $myDependencyJarFiles \ --executor-memory 500M \ --conf "spark.executor.extraJavaOptions=$myJVMOptions" \ --driver-java-options "$myJVMOptions" \ --class org.apache.solr.crunch.CrunchIndexerTool \ --files $(ls $myResourcesDir/log4j.properties),$(ls $myResourcesDir/test-morphlines/loadSolrLine.conf),tokenFile.txt\ $myDriverJar \ -D hadoop.tmp.dir=/tmp \ -D morphlineVariable.ZK_HOST=$(hostname):2181/solr \ -DtokenFile=tokenFile.txt \ --morphline-file loadSolrLine.conf \ --pipeline-type spark \ --chatty \ --log4j log4j.properties \ /user/systest/input/hello1.txt