Batch indexing to Solr using SparkApp framework in spark-submit

You may use spark-submit with your spark job to batch index HDFS files into Solr. For this you need to create a class which implements the SparkApp.RDDProcesor interface. This allows ETL of large datasets to Solr, exploiting Spark's robust data processing capabillities.

To use the SparkApp framework, you need to create a Maven project with the spark-solr dependency.

<dependencies>
        <dependency>
            <groupId>com.lucidworks.spark</groupId>
            <artifactId>spark-solr</artifactId>
            <version>{latest_version}</version>
        </dependency>
    </dependencies>

This project needs to have at least one class, which implements the SparkApp.RDDProcessor. This class can either be a Java or a Scala class. This documentation uses a Java class to demonstrate how to use the framework.

The SparkApp.RDDProcessor has three functions which need to be overwritten:

  • getName()
  • getOptions()
  • run

getName()

The getName() function returns a string, the short name of the application. When running your spark-submit job, this is the name you pass as a parameter to make the job find your class.

public String getName() { return "csv"; }

getOptions()

In the getOptions() function you may specify parameters that are specific to your application. Certain parameters, for example zkHost, collection, or batchSize are present by default. You do not need to specify those here.

public Option[] getOptions() {
     return new Option[]{
          OptionBuilder
               .withArgName("PATH").hasArgs()
               .isRequired(true)
               .withDescription("Path to the CSV file to index")
               .create("csvPath")
     };
}

run

The run function is the core of the application. This returns an integer, and has two parameters, a SparkConf instance and CommandLine instance.

You can create a JavaSparkContext with the use of the SparkConf instance, and use this to open our CSV file as a JavaRDD<String>:

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath"));

You now have to convert these Strings to SolrInputDocument, and create a JavaRDD of them. To achieve this the script uses a custom made map function which splits the CSV file upon commas and adds the records to the SolrInputDocument. For this step to work, you have to specify the schema used in the CSV file in advance.

JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() {
             @Override
             public SolrInputDocument call(String line) throws Exception {
                  SolrInputDocument doc = new SolrInputDocument();
                  String[] row = line.split(",");

                  if (row.length != schema.length)
                        return null;
                  for (int i=0;i<schema.length;i++){
                        doc.setField(schema[i], row[i]);
                  }
                  return doc;
            }
});

After this, the script asks the CommandLine instance for the options it needs to perform indexing:

String zkhost = cli.getOptionValue("zkHost", "localhost:9983");
String collection = cli.getOptionValue("collection", "collection1");
int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100"));

Finally, it indexes data into the Solr cluster:

SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd());

If the function was succesfully called, 0 is returned.