Batch indexing to Solr using SparkApp framework
The Maven project presented here is provided as an example on using the Spark-Solr connector to batch-index data from a CSV file in HDFS into a Solr collection
The Spark-Solr connector framework comes bundled with Cloudera Search. It enables Extraction,
Transformation, and Loading (ETL) of large datasets to Solr. You can use spark-submit with a
Spark job to batch index HDFS files into Solr. For this you need to create a class which
implements the SparkApp.RDDProcesor
interface.
To use the SparkApp framework, you must create a Maven project with the spark-solr dependency.
<dependencies> <dependency> <groupId>com.lucidworks.spark</groupId> <artifactId>spark-solr</artifactId> <version>{latest_version}</version> </dependency> </dependencies>
This project needs to have at a minimum one class, which implements the
SparkApp.RDDProcessor
interface. This class can be written either in Java or Scala. This
documentation uses a Java class to demonstrate how to use the framework.
The SparkApp.RDDProcessor
interface
has three functions which need to be overwritten:
getName()
getOptions()
run
getName()
The getName()
function returns the short name of the application as a
string. When running your spark-submit job, this is the name you pass as a parameter
allowing the
job to find your class.
public String getName() { return "csv"; }
getOptions()
In the getOptions()
function you may specify parameters that are specific
to your application. Certain parameters, for example zkHost
,
collection
, or batchSize
are present by default. You do
not need to specify those here.
public Option[] getOptions() { return new Option[]{ OptionBuilder .withArgName("PATH").hasArgs() .isRequired(true) .withDescription("Path to the CSV file to index") .create("csvPath") }; }
run
The run
function is the core of the application. This returns an integer,
and has two parameters, SparkConf
and CommandLine
.
You can create a JavaSparkContext
class with the use of the
SparkConf
parameter, and use this to open the CSV file as a
JavaRDD<String>
class:
JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath"));
You now have to convert these String
values to
SolrInputDocument
, and create a JavaRDD
class . To
achieve this the script uses a custom-made map function which splits the CSV file upon
commas and adds the records to the SolrInputDocument
document. You must
specify the schema used in the CSV file in advance.
JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() { @Override public SolrInputDocument call(String line) throws Exception { SolrInputDocument doc = new SolrInputDocument(); String[] row = line.split(","); if (row.length != schema.length) return null; for (int i=0;i<schema.length;i++){ doc.setField(schema[i], row[i]); } return doc; } });
After this, the script requires the CommandLine
instance
options to perform indexing:
String zkhost = cli.getOptionValue("zkHost", "localhost:9983"); String collection = cli.getOptionValue("collection", "collection1"); int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100"));
Finally, the job indexes data into the Solr cluster:
SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd());
If the function is succesfully called, 0 is returned.