MapReduceIndexerTool

MapReduceIndexerTool is a MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files and writes the indexes into HDFS in a flexible, scalable, and fault-tolerant manner. It also supports merging the output shards into a set of live customer-facing Solr servers, typically a SolrCloud.

More details are available through the command line help:
$ hadoop jar target/search-mr-*-job.jar \
  org.apache.solr.hadoop.MapReduceIndexerTool --help

usage: hadoop [GenericOptions]... jar search-mr-*-job.jar \
  org.apache.solr.hadoop.MapReduceIndexerTool
       [--help] --output-dir HDFS_URI [--input-list URI]
       --morphline-file FILE [--morphline-id STRING]
       [--update-conflict-resolver FQCN] [--mappers INTEGER]
       [--reducers INTEGER] [--max-segments INTEGER]
       [--fair-scheduler-pool STRING] [--dry-run] [--log4j FILE]
       [--verbose] [--show-non-solr-cloud] [--zk-host STRING] [--go-live]
       [--collection STRING] [--go-live-threads INTEGER]
       [HDFS_URI [HDFS_URI ...]]

The MapReduce batch job is a driver that takes a  morphline  and creates a
set of Solr index shards from a set of input  files and writes the indexes
into HDFS,  in a flexible,  scalable and  fault-tolerant manner.  It  also
supports merging the output shards into a set of live customer facing Solr
servers,  typically  a  SolrCloud.   The   program   proceeds  in  several
consecutive MapReduce based phases, as follows:

1) Randomization phase:  This  (parallel)  phase  randomizes  the  list of
input files in  order  to  spread  indexing  load  more  evenly  among the
mappers of the subsequent phase.

2) Mapper phase: This  (parallel)  phase  takes  the input files, extracts
the relevant content, transforms it and  hands SolrInputDocuments to a set
of reducers. The  ETL  functionality  is  flexible  and customizable using
chains  of  arbitrary  morphline  commands  that  pipe  records  from  one
transformation command to another. Commands  to  parse and transform a set
of standard data formats such as  Avro,  CSV,  Text, HTML, XML, PDF, Word,
or Excel  are  provided  out  of  the  box, and additional custom commands
and parsers for additional file or data  formats can be added as morphline
plug-ins. This is  done  by  implementing  a  simple  Java  interface that
consumes a record (for example a file in the form of an  InputStream  plus
some headers plus contextual metadata)  and  generates  as  output zero or
more  records.  Any  kind of  data  format  can  be  indexed and  any Solr
documents for any kind of Solr schema  can  be  generated,  and any custom
ETL logic can be registered and executed.
Record fields, including MIME  types,  can  also  explicitly  be passed by
force  from  the  CLI  to  the  morphline,  for  example:  hadoop  ...  -D
morphlineField._attachment_mimetype=text/csv

3)   Reducer   phase:   This   (parallel)   phase   loads   the   mapper's
SolrInputDocuments into  one  EmbeddedSolrServer  per  reducer.  Each such
reducer and Solr server can be seen  as  a (micro) shard. The Solr servers
store their data in HDFS.

4) Mapper-only merge  phase:  This  (parallel)  phase  merges  the  set of
reducer shards into the number of Solr  shards expected by the user, using
a mapper-only job. This  phase  is  omitted  if  the  number  of shards is
already equal to the number of shards expected by the user.

5) Go-live phase: This optional (parallel)  phase merges the output shards
of the previous phase into  a  set  of  live customer facing Solr servers,
typically a SolrCloud. If this phase  is  omitted you can explicitly point
each Solr server to one of the HDFS output shard directories.

Fault Tolerance: Mapper and reducer  task  attempts are retried on failure
per the standard MapReduce semantics. On  program  startup all data in the
--output-dir is deleted if that  output  directory  already exists. If the
whole job fails you can retry simply  by rerunning the program again using
the same arguments.

positional arguments:
  HDFS_URI               HDFS URI of  file  or  directory  tree  to index.
                         (default: [])

optional arguments:
  --help, -help, -h      Show this help message and exit
  --input-list URI       Local URI or HDFS  URI  of  a  UTF-8 encoded file
                         containing a list of HDFS  URIs to index, one URI
                         per line in the file.  If  '-' is specified, URIs
                         are read from  the  standard  input.  Multiple --
                         input-list arguments can be specified.
  --morphline-id STRING  The identifier of  the  morphline  that  shall be
                         executed  within   the   morphline   config  file
                         specified  by   --morphline-file.   If   the   --
                         morphline-id option is omitted the first (meaning
                         the top-most) morphline within the config file is
                         used. Example: morphline1
  --update-conflict-resolver FQCN
                         Fully qualified class name  of  a Java class that
                         implements the  UpdateConflictResolver interface.
                         This enables  deduplication  and  ordering  of  a
                         series of document  updates  for  the same unique
                         document key. For example,  a MapReduce batch job
                         might index multiple files in  the same job where
                         some of the files  contain  old  and new versions
                         of the very same document,  using the same unique
                         document key.
                         Typically,  implementations  of   this  interface
                         forbid collisions by  throwing  an  exception, or
                         ignore all but the  most recent document version,
                         or, in the general  case, order colliding updates
                         ascending  from  least  recent   to  most  recent
                         (partial) update. The  caller  of  this interface
                         (i.e. the Hadoop  Reducer)  will  then  apply the
                         updates to Solr  in  the  order  returned  by the
                         orderUpdates() method.
                         The                                       default
                         RetainMostRecentUpdateConflictResolver
                         implementation ignores all  but  the  most recent
                         document  version,   based   on   a  configurable
                         numeric  Solr  field,   which   defaults  to  the
                         file_last_modified   timestamp   (default:   org.
                         apache.solr.hadoop.dedup.
                         RetainMostRecentUpdateConflictResolver)
  --mappers INTEGER      Tuning knob that indicates  the maximum number of
                         MR mapper tasks to use.  -1 indicates use all map
                         slots available on the cluster. (default: -1)
  --reducers INTEGER     Tuning knob that indicates the number of reducers
                         into  which to  index.  To use  one  reducer  per
                         output shard, use 0 for Search 1.x and use -2 for
                         Search  for CDH 5. Using one reducer  per  output
                         shard disables the mtree  merge MR algorithm. The
                         mtree merge MR algorithm  improves scalability by
                         distributing  CPU load  among a  set of  parallel
                         reducers  that can  be  more  numerous  than  the
                         number  of Solr  shards expected  by the user. It
                         can be seen as an extension  of concurrent lucene
                         merges and tiered  lucene merges to the clustered
                         case. -1 indicates use all reduce slots available
                         on the cluster. The  subsequent mapper-only phase
                         merges the reducer output to the number of shards
                         expected  by  the  user,  again  by  utilizing  a
                         cluster's parallelism. (default: -1)
  --max-segments INTEGER
                         Tuning knob that indicates  the maximum number of
                         segments to be contained  on  output in the index
                         of each reducer shard. After  a reducer has built
                         its output index  it  applies  a  merge policy to
                         merge segments  until  there  are  <= maxSegments
                         lucene  segments  left  in  this  index.  Merging
                         segments involves reading and  rewriting all data
                         in all these segment  files, potentially multiple
                         times, which  is  very  I/O  intensive  and  time
                         consuming. However, an index  with fewer segments
                         can later be merged faster,  and  it can later be
                         queried faster  once  deployed  to  a  live  Solr
                         serving shard. Set maxSegments  to  1 to optimize
                         the index for low  query  latency. In a nutshell,
                         a  small   maxSegments   value   trades  indexing
                         latency for subsequently  improved query latency.
                         This can  be  a  reasonable  trade-off  for batch
                         indexing systems. (default: 1)
  --fair-scheduler-pool STRING
                         Optional tuning knob that  indicates  the name of
                         the fair scheduler pool  to  submit  jobs to. The
                         Fair   Scheduler   is   a   pluggable   MapReduce
                         scheduler that  provides  a  way  to  share large
                         clusters.  Fair  scheduling   is   a   method  of
                         assigning resources to  jobs  such  that all jobs
                         get, on  average,  an  equal  share  of resources
                         over time. When there  is  a  single job running,
                         that job  uses  the  entire  cluster.  When other
                         jobs are submitted, tasks slots  that free up are
                         assigned to the new jobs,  so  that each job gets
                         roughly the same amount  of  CPU time. Unlike the
                         default Hadoop scheduler, which  forms a queue of
                         jobs, this lets short  jobs  finish in reasonable
                         time while not starving long  jobs. It is also an
                         easy way to share  a  cluster between multiple of
                         users.  Fair  sharing  can  also  work  with  job
                         priorities - the priorities  are  used as weights
                         to determine the fraction  of  total compute time
                         that each job gets.
  --dry-run              Run in local mode  and  print documents to stdout
                         instead of loading them  into Solr. This executes
                         the morphline  in  the  client  process  (without
                         submitting a job  to  MR)  for quicker turnaround
                         during early trial and debug  sessions. (default:
                         false)
  --log4j FILE           Relative or absolute  path  to a log4j.properties
                         config file on the  local  file system. This file
                         will  be  uploaded  to  each  MR  task.  Example:
                         /path/to/log4j.properties
  --verbose, -v          Turn on verbose output. (default: false)
  --show-non-solr-cloud  Also show options for  Non-SolrCloud mode as part
                         of --help. (default: false)

Required arguments:
  --output-dir HDFS_URI  HDFS directory to write  Solr  indexes to. Inside
                         there one  output  directory  per  shard  will be
                         generated.    Example:    hdfs://c2202.mycompany.
                         com/user/$USER/test
  --morphline-file FILE  Relative or absolute path to  a local config file
                         that contains one  or  more  morphlines. The file
                         must     be      UTF-8      encoded.     Example:
                         /path/to/morphline.conf

Cluster arguments:
  Arguments that provide information about your Solr cluster.

  --zk-host STRING       The address of  a  ZooKeeper  ensemble being used
                         by a SolrCloud  cluster.  This ZooKeeper ensemble
                         will be  examined  to  determine  the  number  of
                         output shards to create as  well as the Solr URLs
                         to merge the output shards into when using the --
                         go-live option. Requires that  you  also pass the
                         --collection to merge the shards into.

                         The  --zk-host   option   implements   the   same
                         partitioning semantics as  the standard SolrCloud
                         Near-Real-Time (NRT)  API.  This  enables  to mix
                         batch  updates  from   MapReduce  ingestion  with
                         updates from standard Solr  NRT  ingestion on the
                         same SolrCloud  cluster,  using  identical unique
                         document keys.

                         Format is: a  list  of  comma separated host:port
                         pairs,  each  corresponding   to   a  zk  server.
                         Example:               '127.0.0.1:2181,127.0.0.1:
                         2182,127.0.0.1:2183'  If   the   optional  chroot
                         suffix is  used  the  example  would  look  like:
                         '127.0.0.1:2181/solr,127.0.0.1:2182/solr,
                         127.0.0.1:2183/solr' where  the  client  would be
                         rooted  at  '/solr'  and   all   paths  would  be
                         relative     to     this      root     -     i.e.
                         getting/setting/etc...  '/foo/bar'  would  result
                         in operations being run  on '/solr/foo/bar' (from
                         the server perspective).


Go live arguments:
  Arguments for merging  the  shards  that  are  built  into  a  live Solr
  cluster. Also see the Cluster arguments.

  --go-live              Allows you to  optionally  merge  the final index
                         shards into a live  Solr  cluster  after they are
                         built. You can pass the ZooKeeper address with --
                         zk-host  and  the  relevant  cluster  information
                         will be auto detected.  (default: false)
  --collection STRING    The SolrCloud  collection  to  merge  shards into
                         when  using  --go-live  and  --zk-host.  Example:
                         collection1
  --go-live-threads INTEGER
                         Tuning knob that indicates  the maximum number of
                         live merges  to  run  in  parallel  at  one time.
                         (default: 1000)

Generic options supported are
  --conf <configuration FILE>
                         specify an application configuration file
  -D <property=value>    use value for given property
  --fs <local|namenode:port>
                         specify a namenode
  --jt <local|jobtracker:port>
                         specify a job tracker
  --files <comma separated list of files>
                         specify comma separated  files  to  be  copied to
                         the map reduce cluster
  --libjars <comma separated list of jars>
                         specify comma separated jar  files  to include in
                         the classpath.
  --archives <comma separated list of archives>
                         specify   comma   separated    archives   to   be
                         unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

Examples:

# Index an Avro based Twitter tweet file into a live SolrCloud cluster:
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --zk-host zk01.mycompany.com:2181/solr \
  --collection collection1 \
  --go-live \
  hdfs:///user/foo/indir

# Index all files that match all of the following conditions:
# 1) File is contained in dir tree hdfs:///user/$USER/solrloadtest/twitter/tweets
# 2) file name matches the glob pattern 'sample-statuses*.gz'
# 3) file was last modified less than 100000 minutes ago
# 4) file size is between 1 MB and 1 GB
# Also include extra library jar file containing JSON tweet Java parser:
hadoop jar target/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool \
  -find hdfs:///user/$USER/solrloadtest/twitter/tweets \
  -type f \
  -name 'sample-statuses*.gz' \
  -mmin -1000000 \
  -size -100000000c \
  -size +1000000c \
  | sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  --libjars /path/to/cdk-morphlines-twitter-0.9.2.jar \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadJsonTestTweets.conf \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --zk-host zk01.mycompany.com:2181/solr \
  --collection collection1 \
  --input-list -