HBaseMapReduceIndexerTool
HBaseMapReduceIndexerTool is a MapReduce batch job driver that takes input data from an HBase table, creates Solr index shards, and writes the indexes into HDFS in a flexible,
scalable, and fault-tolerant manner. It also supports merging the output shards into a set of live customer-facing Solr servers in SolrCloud.
- To invoke the command-line help in a default parcels installation, use:
$ hadoop jar /opt/cloudera/parcels/CDH-*/jars/hbase-indexer-mr-*-job.jar --help
- To invoke the command-line help in a default packages installation, use:
$ hadoop jar /usr/lib/hbase-solr/tools/hbase-indexer-mr-*-job.jar --help
usage: hadoop [GenericOptions]... jar hbase-indexer-mr-*-job.jar
[--hbase-indexer-zk STRING] [--hbase-indexer-name STRING]
[--hbase-indexer-file FILE]
[--hbase-indexer-component-factory STRING]
[--hbase-table-name STRING] [--hbase-start-row BINARYSTRING]
[--hbase-end-row BINARYSTRING] [--hbase-start-time STRING]
[--hbase-end-time STRING] [--hbase-timestamp-format STRING]
[--zk-host STRING] [--go-live] [--collection STRING]
[--go-live-threads INTEGER] [--help] [--output-dir HDFS_URI]
[--overwrite-output-dir] [--morphline-file FILE]
[--morphline-id STRING] [--update-conflict-resolver FQCN]
[--reducers INTEGER] [--max-segments INTEGER]
[--fair-scheduler-pool STRING] [--dry-run] [--log4j FILE]
[--verbose] [--clear-index] [--show-non-solr-cloud]
MapReduce batch job driver that takes input data from an HBase table and
creates Solr index shards and writes the indexes into HDFS, in a flexible,
scalable, and fault-tolerant manner. It also supports merging the output
shards into a set of live customer-facing Solr servers in SolrCloud.
Optionally, documents can be sent directly from the mapper tasks to
SolrCloud, which is a much less scalable approach but enables updating
existing documents in SolrCloud. The program proceeds in one or multiple
consecutive MapReduce-based phases, as follows:
1) Mapper phase: This (parallel) phase scans over the input HBase table,
extracts the relevant content, and transforms it into SolrInputDocuments.
If run as a mapper-only job, this phase also writes the SolrInputDocuments
directly to a live SolrCloud cluster. The conversion from HBase records
into Solr documents is performed via a hbase-indexer configuration and
typically based on a morphline.
2) Reducer phase: This (parallel) phase loads the mapper's
SolrInputDocuments into one EmbeddedSolrServer per reducer. Each such
reducer and Solr server can be seen as a (micro) shard. The Solr servers
store their data in HDFS.
3) Mapper-only merge phase: This (parallel) phase merges the set of
reducer shards into the number of Solr shards expected by the user, using
a mapper-only job. This phase is omitted if the number of shards is
already equal to the number of shards expected by the user
4) Go-live phase: This optional (parallel) phase merges the output shards
of the previous phase into a set of live customer-facing Solr servers in
SolrCloud. If this phase is omitted you can explicitly point each Solr
server to one of the HDFS output shard directories
Fault Tolerance: Mapper and reducer task attempts are retried on failure
per the standard MapReduce semantics. On program startup all data in the --
output-dir is deleted if that output directory already exists and --
overwrite-output-dir is specified. This means that if the whole job fails
you can retry simply by rerunning the program again using the same
arguments.
HBase Indexer parameters:
Parameters for specifying the HBase indexer definition and where it
should be loaded from.
--hbase-indexer-zk STRING
The address of the ZooKeeper ensemble from which
to fetch the indexer definition named --hbase-
indexer-name. Format is: a list of comma
separated host:port pairs, each corresponding to
a zk server. Example: '127.0.0.1:2181,127.0.0.1:
2182,127.0.0.1:2183'
--hbase-indexer-name STRING
The name of the indexer configuration to fetch
from the ZooKeeper ensemble specified with --
hbase-indexer-zk. Example: myIndexer
--hbase-indexer-file FILE
Optional relative or absolute path to a local
HBase indexer XML configuration file. If supplied,
this overrides --hbase-indexer-zk and
--hbase-indexer-name. Example:
/path/to/morphline-hbase-mapper.xml
--hbase-indexer-component-factory STRING
Classname of the hbase indexer component factory.
HBase scan parameters:
Parameters for specifying what data is included while reading from HBase.
--hbase-table-name STRING
Optional name of the HBase table containing the
records to be indexed. If supplied, this
overrides the value from the --hbase-indexer-*
options. Example: myTable
--hbase-start-row BINARYSTRING
Binary string representation of start row from
which to start indexing (inclusive). The format
of the supplied row key should use two-digit hex
values prefixed by \x for non-ASCII characters (e.
g. 'row\x00'). The semantics of this argument are
the same as those for the HBase Scan#setStartRow
method. The default is to include the first row
of the table. Example: AAAA
--hbase-end-row BINARYSTRING
Binary string representation of end row prefix at
which to stop indexing (exclusive). See the
description of --hbase-start-row for more
information. The default is to include the last
row of the table. Example: CCCC
--hbase-start-time STRING
Earliest timestamp (inclusive) in time range of
HBase cells to be included for indexing. The
default is to include all cells. Example: 0
--hbase-end-time STRING
Latest timestamp (exclusive) of HBase cells to be
included for indexing. The default is to include
all cells. Example: 123456789
--hbase-timestamp-format STRING
Timestamp format to be used to interpret --hbase-
start-time and --hbase-end-time. This is a java.
text.SimpleDateFormat compliant format (see http:
//docs.oracle.
com/javase/6/docs/api/java/text/SimpleDateFormat.
html). If this parameter is omitted then the
timestamps are interpreted as number of
milliseconds since the standard epoch (Unix
time). Example: "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
Solr cluster arguments:
Arguments that provide information about your Solr cluster.
--zk-host STRING The address of a ZooKeeper ensemble being used by
a SolrCloud cluster. This ZooKeeper ensemble will
be examined to determine the number of output
shards to create as well as the Solr URLs to
merge the output shards into when using the --go-
live option. Requires that you also pass the --
collection to merge the shards into.
The --zk-host option implements the same
partitioning semantics as the standard SolrCloud
Near-Real-Time (NRT) API. This enables to mix
batch updates from MapReduce ingestion with
updates from standard Solr NRT ingestion on the
same SolrCloud cluster, using identical unique
document keys.
Format is: a list of comma separated host:port
pairs, each corresponding to a zk server.
Example: '127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:
2183' If the optional chroot suffix is used the
example would look like: '127.0.0.1:2181/solr,
127.0.0.1:2182/solr,127.0.0.1:2183/solr' where
the client would be rooted at '/solr' and all
paths would be relative to this root - i.e.
getting/setting/etc... '/foo/bar' would result in
operations being run on '/solr/foo/bar' (from the
server perspective).
Go live arguments:
Arguments for merging the shards that are built into a live Solr
cluster. Also see the Cluster arguments.
--go-live Allows you to optionally merge the final index
shards into a live Solr cluster after they are
built. You can pass the ZooKeeper address with --
zk-host and the relevant cluster information will
be auto detected. (default: false)
--collection STRING The SolrCloud collection to merge shards into
when using --go-live and --zk-host. Example:
collection1
--go-live-threads INTEGER
Tuning knob that indicates the maximum number of
live merges to run in parallel at one time.
(default: 1000)
Optional arguments:
--help, -help, -h Show this help message and exit
--output-dir HDFS_URI HDFS directory to write Solr indexes to. Inside
there one output directory per shard will be
generated. Example: hdfs://c2202.mycompany.
com/user/$USER/test
--overwrite-output-dir
Overwrite the directory specified by --output-dir
if it already exists. Using this parameter will
result in the output directory being recursively
deleted at job startup. (default: false)
--morphline-file FILE Relative or absolute path to a local config file
that contains one or more morphlines. The file
must be UTF-8 encoded. The file will be uploaded
to each MR task. If supplied, this overrides the
value from the --hbase-indexer-* options.
Example: /path/to/morphlines.conf
--morphline-id STRING The identifier of the morphline that shall be
executed within the morphline config file, e.g.
specified by --morphline-file. If the --morphline-
id option is omitted the first (i.e. top-most)
morphline within the config file is used. If
supplied, this overrides the value from the --
hbase-indexer-* options. Example: morphline1
--update-conflict-resolver FQCN
Fully qualified class name of a Java class that
implements the UpdateConflictResolver interface.
This enables deduplication and ordering of a
series of document updates for the same unique
document key. For example, a MapReduce batch job
might index multiple files in the same job where
some of the files contain old and new versions of
the very same document, using the same unique
document key.
Typically, implementations of this interface
forbid collisions by throwing an exception, or
ignore all but the most recent document version,
or, in the general case, order colliding updates
ascending from least recent to most recent
(partial) update. The caller of this interface (i.
e. the Hadoop Reducer) will then apply the
updates to Solr in the order returned by the
orderUpdates() method.
The default
RetainMostRecentUpdateConflictResolver
implementation ignores all but the most recent
document version, based on a configurable numeric
Solr field, which defaults to the
file_last_modified timestamp (default: org.apache.
solr.hadoop.dedup.
RetainMostRecentUpdateConflictResolver)
--reducers INTEGER Tuning knob that indicates the number of reducers
to index into. 0 indicates that no reducers
should be used, and documents should be sent
directly from the mapper tasks to live Solr
servers. -1 indicates use all reduce slots
available on the cluster. -2 indicates use one
reducer per output shard, which disables the
mtree merge MR algorithm. The mtree merge MR
algorithm improves scalability by spreading load
(in particular CPU load) among a number of
parallel reducers that can be much larger than
the number of solr shards expected by the user.
It can be seen as an extension of concurrent
lucene merges and tiered lucene merges to the
clustered case. The subsequent mapper-only phase
merges the output of said large number of
reducers to the number of shards expected by the
user, again by utilizing more available
parallelism on the cluster. (default: -1)
--max-segments INTEGER
Tuning knob that indicates the maximum number of
segments to be contained on output in the index
of each reducer shard. After a reducer has built
its output index it applies a merge policy to
merge segments until there are <= maxSegments
lucene segments left in this index. Merging
segments involves reading and rewriting all data
in all these segment files, potentially multiple
times, which is very I/O intensive and time
consuming. However, an index with fewer segments
can later be merged faster, and it can later be
queried faster once deployed to a live Solr
serving shard. Set maxSegments to 1 to optimize
the index for low query latency. In a nutshell, a
small maxSegments value trades indexing latency
for subsequently improved query latency. This can
be a reasonable trade-off for batch indexing
systems. (default: 1)
--fair-scheduler-pool STRING
Optional tuning knob that indicates the name of
the fair scheduler pool to submit jobs to. The
Fair Scheduler is a pluggable MapReduce scheduler
that provides a way to share large clusters. Fair
scheduling is a method of assigning resources to
jobs such that all jobs get, on average, an equal
share of resources over time. When there is a
single job running, that job uses the entire
cluster. When other jobs are submitted, tasks
slots that free up are assigned to the new jobs,
so that each job gets roughly the same amount of
CPU time. Unlike the default Hadoop scheduler,
which forms a queue of jobs, this lets short jobs
finish in reasonable time while not starving long
jobs. It is also an easy way to share a cluster
between multiple of users. Fair sharing can also
work with job priorities - the priorities are
used as weights to determine the fraction of
total compute time that each job gets.
--dry-run Run in local mode and print documents to stdout
instead of loading them into Solr. This executes
the morphline in the client process (without
submitting a job to MR) for quicker turnaround
during early trial & debug sessions. (default:
false)
--log4j FILE Relative or absolute path to a log4j.properties
config file on the local file system. This file
will be uploaded to each MR task. Example:
/path/to/log4j.properties
--verbose, -v Turn on verbose output. (default: false)
--clear-index Will attempt to delete all entries in a solr
index before starting batch build. This is not
transactional so if the build fails the index
will be empty. (default: false)
--show-non-solr-cloud Also show options for Non-SolrCloud mode as part
of --help. (default: false)
Generic options supported are
--conf <configuration file>
specify an application configuration file
-D <property=value> use value for given property
--fs <local|namenode:port>
specify a namenode
--jt <local|jobtracker:port>
specify a job tracker
--files <comma separated list of files>
specify comma separated files to be copied to the
map reduce cluster
--libjars <comma separated list of jars>
specify comma separated jar files to include in
the classpath.
--archives <comma separated list of archives>
specify comma separated archives to be unarchived
on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
Examples:
# (Re)index a table in GoLive mode based on a local indexer config file
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-file indexer.xml \
--zk-host 127.0.0.1/solr \
--collection collection1 \
--go-live \
--log4j src/test/resources/log4j.properties
# (Re)index a table in GoLive mode using a local morphline-based indexer config file
# Also include extra library jar file containing JSON tweet Java parser:
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
--libjars /path/to/kite-morphlines-twitter-0.10.0.jar \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-file src/test/resources/morphline_indexer_without_zk.xml \
--zk-host 127.0.0.1/solr \
--collection collection1 \
--go-live \
--morphline-file src/test/resources/morphlines.conf \
--output-dir hdfs://c2202.mycompany.com/user/$USER/test \
--overwrite-output-dir \
--log4j src/test/resources/log4j.properties
# (Re)index a table in GoLive mode
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-file indexer.xml \
--zk-host 127.0.0.1/solr \
--collection collection1 \
--go-live \
--log4j src/test/resources/log4j.properties
# (Re)index a table with direct writes to SolrCloud
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-file indexer.xml \
--zk-host 127.0.0.1/solr \
--collection collection1 \
--reducers 0 \
--log4j src/test/resources/log4j.properties
# (Re)index a table based on a indexer config stored in ZK
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-zk zk01 \
--hbase-indexer-name docindexer \
--go-live \
--log4j src/test/resources/log4j.properties
# MapReduce on Yarn - Pass custom JVM arguments
HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000'; \
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
-D 'mapreduce.map.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
-D 'mapreduce.reduce.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
--hbase-indexer-zk zk01 \
--hbase-indexer-name docindexer \
--go-live \
--log4j src/test/resources/log4j.properties\n
# MapReduce on MR1 - Pass custom JVM arguments
HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000'; \
hadoop --config /etc/hadoop/conf \
jar hbase-indexer-mr-*-job.jar \
--conf /etc/hbase/conf/hbase-site.xml \
-D 'mapreduce.child.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
--hbase-indexer-zk zk01 \ " --hbase-indexer-name docindexer \
--go-live \
--log4j src/test/resources/log4j.properties\n\n");