Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
Merge pull request #80 from Stratio/master
Browse files Browse the repository at this point in the history
Merged branch-0.10 and master
  • Loading branch information
pfcoperez committed Jan 8, 2016
2 parents 8a769ec + 187697c commit 21c00f3
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 20 deletions.
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,31 @@ If you are using this Data Source, feel free to briefly share your experience by
| 0.8.0 | 1.2.1 | 3.0.x |


## How to use Spark-MongoDB##
## Requirements##

This library requires Apache Spark, Scala 2.10 or Scala 2.11, Casbah 2.8.X

Requirements of this project can be found in [about] (<https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/about.rst>).
#### Latest compatible versions####

| spark-MongoDB | Apache Spark | MongoDB |
| ------------- | ------------- | -------- |
| 0.10.x | 1.5.x | 3.0.x |
| 0.8.2 - 0.9.2 | 1.4.0 | 3.0.x |
| 0.8.1 | 1.3.0 | 3.0.x |
| 0.8.0 | 1.2.1 | 3.0.x |


## How to use Spark-MongoDB##

There also exists a [First Steps] (<https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst>) document where we show some simple examples.

- [Using the library](https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#using-the-library)
- [Configuration parameters](https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#configuration-parameters)
- [Examples](https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#examples)
- [Scala API](https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#scala-api)
- [Python API](https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#python-api)
- [R API](https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#r-api)



# License #
Expand Down
17 changes: 16 additions & 1 deletion doc/src/site/sphinx/First_Steps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@ First steps

We are going to introduce how to use our MongoDB datasource for Apache Spark.

Table of Contents
*****************

- `Using the library <#using-the-library>`__

- `Configuration parameters <#configuration-parameters>`__

- `Examples <#examples>`__

- `Scala API <#scala-api>`__
- `Python API <#python-api>`__
- `R API <#r-api>`__



Using the library
=================

Expand Down Expand Up @@ -126,7 +141,7 @@ To save a DataFrame in MongoDB you should use the saveToMongodb() function as fo
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.datasource.mongodb._
import MongodbConfig._
val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "highschool", Collection -> "students", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitKey -> "_id", SplitSize -> 8, SplitKey -> "_id"))
val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "highschool", Collection -> "students", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitSize -> 8, SplitKey -> "_id"))
dataFrame.saveToMongodb(saveConfig.build)


Expand Down
6 changes: 3 additions & 3 deletions doc/src/site/sphinx/PoweredBy.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
============
Powered By
Powered by
============

If you are using this Data Source, feel free to briefly share your experience by Pull Request.
Expand All @@ -9,8 +9,8 @@ If you are using this Data Source, feel free to briefly share your experience by
Companies
***************

* `Stratio Connectors <http://stratio.com>`_ : Description of use case.

- `Stratio platform <http://www.stratio.com/>`_:
- `Crossdata <https://github.com/Stratio/crossdata>`_: Crossdata uses spark-mongodb datasource as a piece of the MongoDB connector.



4 changes: 2 additions & 2 deletions spark-mongodb-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

<properties>
<scala.binary.version>2.10</scala.binary.version>
<mongodb.datasource.version>0.10.0-SNAPSHOT</mongodb.datasource.version>
<spark.version>1.5.1</spark.version>
<mongodb.datasource.version>0.10.1</mongodb.datasource.version>
<spark.version>1.5.2</spark.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,34 @@ object MongodbRelation {
def pruneSchema(
schema: StructType,
requiredColumns: Array[String]): StructType =
pruneSchema(schema, requiredColumns.map(_ -> None): Array[(String, Option[Int])])


/**
* Prune whole schema in order to fit with
* required columns taking in consideration nested columns (array elements) in Spark SQL statement.
* @param schema Whole field projection schema.
* @param requiredColumnsWithIndex Required fields in statement including index within field for random accesses.
* @return A new pruned schema
*/
private[this] def pruneSchema(
schema: StructType,
requiredColumnsWithIndex: Array[(String, Option[Int])]): StructType = {
val name2sfield: Map[String, StructField] = schema.fields.map(f => f.name -> f).toMap
StructType(
requiredColumns.flatMap(column =>
schema.fields.find(_.name == column)))
requiredColumnsWithIndex.flatMap {
case (colname, None) => name2sfield.get(colname)
case (colname, Some(idx)) => name2sfield.get(colname) collect {
case field @ StructField(name, ArrayType(et,_), nullable, _) =>
val mdataBuilder = new MetadataBuilder
//Non-functional area
mdataBuilder.putLong("idx", idx.toLong)
mdataBuilder.putString("colname", name)
//End of non-functional area
StructField(s"$name[$idx]", et, true, mdataBuilder.build())
}
}
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ class MongodbReader(
case Not(filter) =>
filtersToDBObject(Array(filter), true)
}

queryBuilder.get
}

filtersToDBObject(filters)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ trait JsonSupport {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toLong
case value: java.lang.Long => value.asInstanceOf[Long]
case value: java.lang.Double => value.asInstanceOf[Double].toLong
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import com.mongodb.casbah.Imports._
import com.stratio.datasource.schema.RowConverter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.{ArrayType, DataType, StructField, StructType, MapType}
import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, GenericRow}
import org.apache.spark.sql.types._

import scala.collection.immutable.ListMap
import scala.collection.mutable.ArrayBuffer
Expand All @@ -30,8 +30,8 @@ import scala.collection.mutable.ArrayBuffer
* from DBObject to Row and vice versa
*/
object MongodbRowConverter extends RowConverter[DBObject]
with JsonSupport
with Serializable {
with JsonSupport
with Serializable {

/**
*
Expand Down Expand Up @@ -64,14 +64,22 @@ with Serializable {
* @return The converted row
*/
def recordAsRow(
json: Map[String, AnyRef],
schema: StructType): Row = {
json: Map[String, AnyRef],
schema: StructType): Row = {

val values: Seq[Any] = schema.fields.map {
case StructField(name, et, _, mdata)
if(mdata.contains("idx") && mdata.contains("colname")) =>
val colName = mdata.getString("colname")
val idx = mdata.getLong("idx").toInt
json.get(colName).flatMap(v => Option(v)).map(toSQL(_, ArrayType(et, true))).collect {
case elemsList: Seq[_] if((0 until elemsList.size) contains idx) => elemsList(idx)
} orNull
case StructField(name, dataType, _, _) =>
json.get(name).flatMap(v => Option(v)).map(
toSQL(_, dataType)).orNull
}
Row.fromSeq(values)
new GenericRowWithSchema(values.toArray, schema)
}

/**
Expand Down Expand Up @@ -146,4 +154,4 @@ with Serializable {
dBObject.seq.toMap
}

}
}
2 changes: 1 addition & 1 deletion spark-mongodb_2.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<sonar.projectBaseDir>spark-mongodb</sonar.projectBaseDir>
<scala.test.version>2.2.5</scala.test.version>
<scala.mock.version>3.2.1</scala.mock.version>
<spark.version>1.5.1</spark.version>
<spark.version>1.5.2</spark.version>
<casbah.version>2.8.0</casbah.version>
</properties>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion spark-mongodb_2.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<sonar.projectBaseDir>spark-mongodb</sonar.projectBaseDir>
<scala.test.version>2.2.5</scala.test.version>
<scala.mock.version>3.2.1</scala.mock.version>
<spark.version>1.5.1</spark.version>
<spark.version>1.5.2</spark.version>
<casbah.version>2.8.0</casbah.version>
</properties>
<dependencies>
Expand Down

0 comments on commit 21c00f3

Please sign in to comment.