Create indexer Maven project
As a prerequisite to using the SparkApp framework, you need to create a Maven project
with the Spark-Solr dependency and at least one class, implementing the
SparkApp.RDDProcessor
interface.
You can either write a Java or a Scala class implementation. The examples show implementation with a Java class.
-
Create the
indexer
Maven project. -
Edit the .pom file, add the following spark-solr-dependency:
Replace [***latest version***] with an actual version number:<dependencies> <dependency> <groupId>com.lucidworks.spark</groupId> <artifactId>spark-solr</artifactId> <version>[***latest version***]</version> <classifier>shaded</classifier> </dependency> </dependencies>
- To use Spark 2, replace it with 3.9.0.[***CDP VERSION***]-[***BUILD***]
- To use Spark 3, replace it with 3.9.0.3000.[***CDS VERSION***]-[***BUILD***]
For example:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>indexer</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <repositories> <repository> <id>cdh.repo</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <name>Cloudera Repositories</name> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>com.lucidworks.spark</groupId> <artifactId>spark-solr</artifactId> <version>3.9.0.3000.3.4.7190.1-123</version> <classifier>shaded</classifier> </dependency> </dependencies> </project>
-
Create a
CSVIndexer.java
file that implements theSparkApp.RDDProcessor
interface in the main/java/com/lucidworks/spark directory.For example:package com.lucidworks.spark; import com.lucidworks.spark.SparkApp; import com.lucidworks.spark.util.SolrSupport; import shaded.apache.commons.cli.CommandLine; import shaded.apache.commons.cli.Option; import shaded.apache.commons.cli.OptionBuilder; import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; public class CSVIndexer implements SparkApp.RDDProcessor { @Override public String getName() { return "csv"; } @Override public Option[] getOptions() { return new Option[]{ OptionBuilder .withArgName("PATH").hasArgs() .isRequired(true) .withDescription("Path to the CSV file to index") .create("csvPath") }; } private String[] schema = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount".split(","); @Override public int run(SparkConf conf, CommandLine cli) throws Exception { JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> textFile = jsc.textFile(cli.getOptionValue("csvPath")); JavaRDD<SolrInputDocument> jrdd = textFile.map(new Function<String, SolrInputDocument>() { @Override public SolrInputDocument call(String line) throws Exception { SolrInputDocument doc = new SolrInputDocument(); String[] row = line.split(","); if (row.length != schema.length) return null; for (int i=0;i<schema.length;i++){ doc.setField(schema[i], row[i]); } return doc; } }); String zkhost = cli.getOptionValue("zkHost", "localhost:9983"); String collection = cli.getOptionValue("collection", "collection1"); int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "100")); SolrSupport.indexDocs(zkhost, collection, batchSize, jrdd.rdd()); return 0; } }
-
Create a JAR file:
mvn clean install
Theindexer.jar
file is created.