Indexing Data Using Spark-Solr ConnectorPDF version

Create indexer Maven project

As a prerequisite to using the SparkApp framework, you need to create a Maven project with the Spark-Solr dependency and at least one class, implementing the SparkApp.RDDProcessor interface.

You can either write a Java or a Scala class implementation. The examples show implementation with a Java class.

  1. Create the indexer Maven project.
  2. Edit the .pom file, add the following spark-solr-dependency:
    <dependencies>
            <dependency>
                <groupId>com.lucidworks.spark</groupId>
                <artifactId>spark-solr</artifactId>
                <version>{latest_version}</version>
                <classifier>shaded</classifier>
            </dependency>
        </dependencies>
    
    Replace {latest_version} with an actual version number.
    For example:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>indexer</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
    
        <repositories>
          <repository>
            <id>cdh.repo</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/com/lucidworks/spark/spark-solr/</url>
            <name>Cloudera Repositories</name>
            <snapshots>
              <enabled>true</enabled>
            </snapshots>
          </repository>
        </repositories>
    
        <dependencies>
            <dependency>
                <groupId>com.lucidworks.spark</groupId>
                <artifactId>spark-solr</artifactId>
                <version>3.9.0.7.2.2.0-218</version>
                <classifier>shaded</classifier>
            </dependency>
        </dependencies>
    
    </project>
  3. Create a CSVIndexer.java file that implements the SparkApp.RDDProcessor interface.
    For example:
    import com.lucidworks.spark.SparkApp;
    import com.lucidworks.spark.util.SolrSupport;
    import shaded.apache.commons.cli.CommandLine;
    import shaded.apache.commons.cli.Option;
    import shaded.apache.commons.cli.OptionBuilder;
    import org.apache.solr.common.SolrInputDocument;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    
    public class CSVIndexer implements SparkApp.RDDProcessor {
        @Override
        public String getName() {
            return "csv";
        }
    
        @Override
        public Option[] getOptions() {
            return new Option[]{
                    OptionBuilder
                            .withArgName("PATH").hasArgs()
                            .isRequired(true)
                            .withDescription("Path to the CSV file to index")
                            .create("csvPath")
            };
        }
    
        private String[] schema = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount".split(",");
    
        @Override
        public int run(SparkConf conf, CommandLine cli) throws Exception {
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath"));
            JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() {
                @Override
                public SolrInputDocument call(String line) throws Exception {
                    SolrInputDocument doc = new SolrInputDocument();
                    String[] row = line.split(",");
    
                    if (row.length != schema.length)
                        return null;
                    for (int i=0;i<schema.length;i++){
                        doc.setField(schema[i], row[i]);
                    }
                    return doc;
                }
            });
    
            String zkhost = cli.getOptionValue("zkHost", "localhost:9983");
            String collection = cli.getOptionValue("collection", "collection1");
            int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100"));
    
            SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd());
    
            return 0;
        }
    }
    
    
  4. Create a JAR file:
    mvn clean install
    The indexer.jar file is created.