Skip to content

mrt/bigsolr

Repository files navigation

#BigSolr

BigSolr will try to provide comprehensive implementation of Solr connectors for Apache Hadoop, Spark and other big data technologies with Hadoop and Spark.

##Features

  • Provides custom Hadoop APIs to access Solr servers
  • Allows Apache Spark to read and write data with Solr servers through the Hadoop APIs
  • Integration with Cascading/Scalding, Pig, Hive, etc. (future plan-- not supported yet)

Requirements

  • Apache Spark 1.1 or higher (1.2 is recommended)
  • Apache Solr 4.10.x

How to Build

The following maven command creates bigsolr-0.1.jar in the target directory.

``` $ mvn clean compile package ```

Prerequisites

Before running the Spark program, Solr has to be up and running either in StandAlone or SolrCloud mode.

``` In StandAlone/HttpServer mode: $ bin/solr start

In SolrCloud (cloud) mode: $ bin/solr -e cloud

For the following example, create collection/core (index table) named "collection1" once the Solr server is up and running. 


## Using Spark Shell

<p><i>Note: please ensure your Spark distribution's Hadoop version! For Hadoop 2.3/2.4 or higher distributions, follow the instructions for New Hadoop API. If your Spark is Hadoop 1.x, please follow the instructions for old Hadoop API below.</i></p>

$ spark-shell --jars target/bigsolr-0.1.jar


### Reading with New Hadoop API (mapreduce)

scala> import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.io.NullWritable

scala> import org.bigsolr.hadoop.SolrInputFormat

scala> import org.bigsolr.hadoop.SolrRecord


For SolrCloud mode

scala> val serverMode: String = "cloud"

scala> val serverUrl: String = "localhost:9983"

For StandAlone/HttpServer mode

scala> val serverMode: String = "standalone"

scala> val serverUrl: String = "http://localhost:8983/solr"


scala> val collection: String = "collection1"

scala> val fields: String = "id,description"

scala> val queryStr: String = "description:*"

scala> var conf = new Configuration()

scala> conf.set("solr.query", queryStr)

scala> conf.set("solr.server.url", serverUrl)

scala> conf.set("solr.server.mode", serverMode)

scala> conf.set("solr.server.collection", collection)

scala> conf.set("solr.server.fields", fields)

scala> val rdds = sc.newAPIHadoopRDD(conf, classOf[SolrInputFormat], classOf[NullWritable], classOf[SolrRecord]).map { case (key, value) => { value.getSolrDocument() } }

scala> rdds.count

scala> rdds.first

scala> rdds.first.getFieldValue("id")

scala> rdds.first.getFieldValue("description")

scala> rdds.first.getFieldValuesMap()


### Indexing with New Hadoop API (mapreduce)

scala> import org.bigsolr.hadoop.SolrOutputFormat scala> import org.bigsolr.hadoop.SolrInputRecord

scala> import org.apache.hadoop.io.MapWritable scala> import org.apache.hadoop.io.NullWritable scala> import org.apache.hadoop.mapreduce.Job // New Hadoop API scala> import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.io.Text;

scala> var conf = new Configuration() scala> conf.set("solr.server.url", serverUrl) scala> conf.set("solr.server.mode", serverMode) scala> conf.set("solr.server.collection", collection) scala> conf.set("solr.server.fields", fields)

// Example with MapWritable

scala> val m1 = Map("id" -> "1", "description" -> "apple orange New York", "author" -> "John") scala> val m2 = Map("id" -> "2", "description" -> "apple peach San Diego", "author" -> "Kevin") scala> val m3 = Map("id" -> "3", "description" -> "Apple tomato San Francisco", "author" -> "Nick") scala> val l1 = ListMap[String,String] scala> val rdds1 = sc.parallelize(l1)

scala> val rdds1a = rdds1.map(e => { val record = new MapWritable() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.put(new Text("id"), new Text(id)) record.put(new Text("description"), new Text(description)) record.put(new Text("author"), new Text(author)) (NullWritable.get, record) })

// Index with MapWritable scala> rdds1a.saveAsNewAPIHadoopFile( "-", // this path parameter will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf )

// Example with SolrInputRecord (Wrapper for SolrInputDocument)

scala> val m4 = Map("id" -> "4", "description" -> "orange lake Florida", "author" -> "Emily") scala> val m5 = Map("id" -> "5", "description" -> "cherry forest Vermont", "author" -> "Kate") scala> val m6 = Map("id" -> "6", "description" -> "strawberry beach California", "author" -> "Monica") scala> val l2 = ListMap[String,String] scala> val rdds2 = sc.parallelize(l2)

scala> val rdds2a = rdds2.map(e => { val record = new SolrInputRecord() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.setField("id", id) record.setField("description", description) record.setField("author", author) //record.put(new Text(id), new Text(description)) (NullWritable.get, record) })

// Index with SolrInputRecord scala> rdds2a.saveAsNewAPIHadoopFile( "-", classOf[NullWritable], classOf[SolrInputRecord], classOf[SolrOutputFormat], conf )



### Reading with old Hadoop API (mapred)</p>

scala> import org.apache.hadoop.mapred.JobConf

scala> import org.apache.hadoop.io.NullWritable

scala> import org.bigsolr.hadoop.SolrInputFormat

scala> import org.bigsolr.hadoop.SolrRecord


For SolrCloud mode

scala> val serverMode: String = "cloud"

scala> val serverUrl: String = "localhost:9983"

For StandAlone/HttpServer mode

scala> val serverMode: String = "standalone"

scala> val serverUrl: String = "http://localhost:8983/solr"


scala> val collection: String = "collection1"

scala> val fields: String = "id,description"

scala> val queryStr: String = "description:*"

scala> var conf = new JobConf(sc.hadoopConfiguration)

scala> conf.set("solr.query", queryStr)

scala> conf.set("solr.server.url", serverUrl)

scala> conf.set("solr.server.mode", serverMode)

scala> conf.set("solr.server.collection", collection)

scala> conf.set("solr.server.fields", fields)

scala> val rdds = sc.hadoopRDD(conf, classOf[SolrInputFormat], classOf[NullWritable], classOf[SolrRecord]).map { case (key, value) => { value.getSolrDocument() } }

scala> rdds.count

scala> rdds.first

scala> rdds.first.getFieldValue("id")

scala> rdds.first.getFieldValue("description")

scala> rdds.first.getFieldValuesMap()


### Indexing with Old Hadoop API (mapred)

scala> import org.bigsolr.hadoop.SolrOutputFormat scala> import org.bigsolr.hadoop.SolrInputRecord

scala> import org.apache.hadoop.io.MapWritable scala> import org.apache.hadoop.io.NullWritable scala> import org.apache.hadoop.mapred.JobConf // Old Hadoop API scala> import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.io.Text;

scala> var conf = new JobConf(sc.hadoopConfiguration) scala> conf.set("solr.server.url", serverUrl) scala> conf.set("solr.server.mode", serverMode) scala> conf.set("solr.server.collection", collection) scala> conf.set("solr.server.fields", fields)

scala> val m1 = Map("id" -> "1", "description" -> "apple orange New York", "author" -> scala> val m2 = Map("id" -> "2", "description" -> "apple peach San Diego", "author" -> "Kevin") scala> val m3 = Map("id" -> "3", "description" -> "Apple tomato San Francisco", "author" -> "Nick") scala> val l1 = ListMap[String,String] scala> val rdds1 = sc.parallelize(l1)

// Example with MapWritable scala> val rdds1a = rdds1.map(e => { val record = new MapWritable() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.put(new Text("id"), new Text(id)) record.put(new Text("description"), new Text(description)) record.put(new Text("author"), new Text(author)) (NullWritable.get, record) })

// Index with MapWritable scala> rdds1a.saveAsHadoopFile( "-", // No Path-- will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf, None )

scala> val m4 = Map("id" -> "4", "description" -> "orange lake Florida", "author" -> "Emily") scala> val m5 = Map("id" -> "5", "description" -> "cherry forest Vermont", "author" -> "Kate") scala> val m6 = Map("id" -> "6", "description" -> "strawberry beach California", "author" -> "Monica") scala> val l2 = ListMap[String,String] scala> val rdds2 = sc.parallelize(l2)

// Example with SolrInputRecord (Wrapper for SolrInputDocument) scala> val rdds2a = rdds2.map(e => { val record = new SolrInputRecord() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.setField("id", id) record.setField("description", description) record.setField("author", author) //record.put(new Text(id), new Text(description)) (NullWritable.get, record) })

// Index with SolrInputRecord scala> rdds1a.saveAsHadoopFile( "-", // No Path-- will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf, None )


## SparkSQL

### Scala API

$ spark-shell --jars target/bigsolr-0.1.jar


scala> import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)

scala> import org.bigsolr.spark.solr._

scala> val rdds = sqlContext.query("id:*", "http://localhost:8983/solr", "standalone", "collection1", "id,description")

scala> rdds.count

scala> rdds.first


### SQL API

$ spark-sql --jars target/bigsolr-0.1.jar


spark-sql> CREATE TEMPORARY TABLE solr
> USING org.bigsolr.spark
> OPTIONS (query "id:*", serverUrl "http://localhost:8983/solr", serverMode "standalone", collection "collection1", fields "id,description");

spark-sql> select description from solr;


## Using PySpark Shell
<p><i>Note: please ensure your Spark distribution's Hadoop version! For Hadoop 2.3/2.4 or higher distributions, follow the instructions for New Hadoop API. If your Spark is Hadoop 1.x, please follow the instructions for old Hadoop API below.</i></p>

$ pyspark --jars target/bigsolr-0.1.jar


### Reading from Solr with PySpark

For SolrCloud mode

conf = {"solr.server.url":"localhost:9983", "solr.server.mode":"cloud", "solr.server.collection":"collection1", "solr.query":"id:*", "solr.server.fields":"id,description"}

For StandAlone/HttpServer mode

conf = {"solr.server.url":"http://localhost:8983/solr", "solr.server.mode":"standalone", "solr.server.collection":"collection1", "solr.query":"id:*", "solr.server.fields":"id,description"}


rdds = sc.hadoopRDD("org.bigsolr.hadoop.SolrInputFormat", "org.apache.hadoop.io.NullWritable", "org.bigsolr.hadoop.SolrRecord", conf=conf)

rdd.count()

import json results = rdds.collect() for r in results: ... print json.loads(r[1])["id"] ... print json.loads(r[1])["description"]


### Indexing (Saving) RDDs in Solr with PySpark

For SolrCloud mode

conf = {"solr.server.url":"localhost:9983", "solr.server.mode":"cloud", "solr.server.collection":"collection1", "solr.server.fields":"id,description"}

For StandAlone/HttpServer mode

conf = {"solr.server.url":"http://localhost:8983/solr", "solr.server.mode":"standalone", "solr.server.collection":"collection1", "solr.server.fields":"id,description"}


m1 = (None, {"id": "1", "description": "apple orange New York", "author": "John"}) m2 = (None, {"id": "2", "description": "apple peach San Diego", "author": "Kevin"}) data = [m1,m2] rdds = sc.parallelize(data)

rdds.saveAsHadoopFile("-", "org.bigsolr.hadoop.SolrOutputFormat", "org.apache.hadoop.io.NullWritable", "org.apache.hadoop.io.MapWritable", conf=conf)




## License
This software is available under the [Apache License, Version 2.0](LICENSE.txt).


## Reporting Bugs
Please use GitHub to report feature requests or bugs.  

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published