Batch indexing to Solr using SparkApp framework

The Maven project presented here is provided as an example on using the Spark-Solr connector to batch-index data from a CSV file in HDFS into a Solr collection

The Spark-Solr connector framework comes bundled with Cloudera Search. It enables Extraction, Transformation, and Loading (ETL) of large datasets to Solr. You can use spark-submit with a Spark job to batch index HDFS files into Solr. For this you need to create a class which implements the SparkApp.RDDProcesor interface.

To use the SparkApp framework, you must create a Maven project with the spark-solr dependency.

<dependencies>
        <dependency>
            <groupId>com.lucidworks.spark</groupId>
            <artifactId>spark-solr</artifactId>
            <version>{latest_version}</version>
        </dependency>
    </dependencies>

This project needs to have at a minimum one class, which implements the SparkApp.RDDProcessor interface. This class can be written either in Java or Scala. This documentation uses a Java class to demonstrate how to use the framework.

The SparkApp.RDDProcessor interface has three functions which need to be overwritten:

  • getName()
  • getOptions()
  • run

getName()

The getName() function returns the short name of the application as a string. When running your spark-submit job, this is the name you pass as a parameter allowing the job to find your class.

public String getName() { return "csv"; }

getOptions()

In the getOptions() function you may specify parameters that are specific to your application. Certain parameters, for example zkHost, collection, or batchSize are present by default. You do not need to specify those here.

public Option[] getOptions() {
     return new Option[]{
          OptionBuilder
               .withArgName("PATH").hasArgs()
               .isRequired(true)
               .withDescription("Path to the CSV file to index")
               .create("csvPath")
     };
}

run

The run function is the core of the application. This returns an integer, and has two parameters, SparkConf and CommandLine.

You can create a JavaSparkContext class with the use of the SparkConf parameter, and use this to open the CSV file as a JavaRDD<String> class:

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath"));

You now have to convert these String values to SolrInputDocument, and create a JavaRDD class . To achieve this the script uses a custom-made map function which splits the CSV file upon commas and adds the records to the SolrInputDocument document. You must specify the schema used in the CSV file in advance.

JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() {
             @Override
             public SolrInputDocument call(String line) throws Exception {
                  SolrInputDocument doc = new SolrInputDocument();
                  String[] row = line.split(",");

                  if (row.length != schema.length)
                        return null;
                  for (int i=0;i<schema.length;i++){
                        doc.setField(schema[i], row[i]);
                  }
                  return doc;
            }
});

After this, the script requires the CommandLine instance options to perform indexing:

String zkhost = cli.getOptionValue("zkHost", "localhost:9983");
String collection = cli.getOptionValue("collection", "collection1");
int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100"));

Finally, the job indexes data into the Solr cluster:

SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd());

If the function is succesfully called, 0 is returned.