Batch indexing to Solr using SparkApp framework in spark-submit
You may use spark-submit with your spark job to batch index HDFS files into Solr. For this
you need to create a class which implements the SparkApp.RDDProcesor
interface. This allows ETL of large datasets to Solr, exploiting Spark's robust data
processing capabillities.
To use the SparkApp framework, you need to create a Maven project with the spark-solr dependency.
<dependencies>
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>{latest_version}</version>
</dependency>
</dependencies>
This project needs to have at least one class, which implements the
SparkApp.RDDProcessor
. This class can either be a Java or a Scala class.
This documentation uses a Java class to demonstrate how to use the framework.
The SparkApp.RDDProcessor
has three functions which need to be
overwritten:
getName()
getOptions()
run
getName()
The getName()
function returns a string, the short name of the
application. When running your spark-submit job, this is the name you pass as a parameter to
make the job find your class.
public String getName() { return "csv"; }
getOptions()
In the getOptions()
function you may specify parameters that are specific
to your application. Certain parameters, for example zkHost
,
collection
, or batchSize
are present by default. You do
not need to specify those here.
public Option[] getOptions() {
return new Option[]{
OptionBuilder
.withArgName("PATH").hasArgs()
.isRequired(true)
.withDescription("Path to the CSV file to index")
.create("csvPath")
};
}
run
The run
function is the core of the application. This returns an integer,
and has two parameters, a SparkConf
instance and
CommandLine
instance.
You can create a JavaSparkContext
with the use of the
SparkConf
instance, and use this to open our CSV file as a
JavaRDD<String>
:
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath"));
You now have to convert these String
s to
SolrInputDocument
, and create a JavaRDD
of them. To
achieve this the script uses a custom made map function which splits the CSV file upon
commas and adds the records to the SolrInputDocument
. For this step to
work, you have to specify the schema used in the CSV file in advance.
JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() {
@Override
public SolrInputDocument call(String line) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
String[] row = line.split(",");
if (row.length != schema.length)
return null;
for (int i=0;i<schema.length;i++){
doc.setField(schema[i], row[i]);
}
return doc;
}
});
After this, the script asks the CommandLine
instance for the
options it needs to perform indexing:
String zkhost = cli.getOptionValue("zkHost", "localhost:9983");
String collection = cli.getOptionValue("collection", "collection1");
int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100"));
Finally, it indexes data into the Solr cluster:
SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd());
If the function was succesfully called, 0 is returned.