Skip to content

Commit

Permalink
Handle empty partition iterators
Browse files Browse the repository at this point in the history
Empty edge partitions sometimes appear in the output of zipPartitions
for unknown reasons, causing calls to Iterator#next to fail. This commit
checks these cases, handles them by returning an empty iterator, and
logs an error if this would cause GraphX to drop a corresponding
non-empty partition.

Resolves amplab/graphx#52.

(cherry picked from commit 2265c87c387979c94275e673a16527f582b2f38a)
  • Loading branch information
ankurdave committed Apr 22, 2014
1 parent 0f87e6a commit 354d154
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 132 deletions.
32 changes: 25 additions & 7 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.Logging
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
Expand All @@ -30,7 +31,8 @@ import org.apache.spark.storage.StorageLevel
*/
class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD)))
with Logging {

partitionsRDD.setName("EdgeRDD")

Expand All @@ -46,7 +48,11 @@ class EdgeRDD[@specialized ED: ClassTag](

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand All @@ -70,8 +76,12 @@ class EdgeRDD[@specialized ED: ClassTag](
private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand Down Expand Up @@ -108,9 +118,17 @@ class EdgeRDD[@specialized ED: ClassTag](
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
if (thisIter.hasNext && otherIter.hasNext) {
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
} else {
if (thisIter.hasNext != otherIter.hasNext) {
logError("innerJoin: Dropped non-empty edge partition from `%s`".format(
if (thisIter.hasNext) "this" else "other"))
}
Iterator.empty
}
})
}

Expand Down
143 changes: 84 additions & 59 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx.impl

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.Logging
import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
Expand Down Expand Up @@ -47,7 +48,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val edges: EdgeRDD[ED],
@transient val routingTable: RoutingTable,
@transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] with Serializable {
extends Graph[VD, ED] with Serializable with Logging {

/** Default constructor is provided to support serialization */
protected def this() = this(null, null, null, null)
Expand All @@ -58,9 +59,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edTag = classTag[ED]
edges.partitionsRDD.zipPartitions(
replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
val (pid, ePart) = ePartIter.next()
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
if (ePartIter.hasNext && vPartIter.hasNext) {
val (pid, ePart) = ePartIter.next()
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
} else {
if (ePartIter.hasNext != vPartIter.hasNext) {
logError("triplets: Dropped non-empty %s partition".format(
if (ePartIter.hasNext) "edge" else "vertex"))
}
Iterator.empty
}
}
}

Expand Down Expand Up @@ -131,22 +140,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val newEdgePartitions =
edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
(ePartIter, vTableReplicatedIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vTableReplicatedIter.next()
assert(!vTableReplicatedIter.hasNext)
assert(ePid == vPid)
val et = new EdgeTriplet[VD, ED]
val inputIterator = edgePartition.iterator.map { e =>
et.set(e)
et.srcAttr = vPart(e.srcId)
et.dstAttr = vPart(e.dstId)
et
}
// Apply the user function to the vertex partition
val outputIter = f(ePid, inputIterator)
// Consume the iterator to update the edge attributes
val newEdgePartition = edgePartition.map(outputIter)
Iterator((ePid, newEdgePartition))
if (ePartIter.hasNext && vTableReplicatedIter.hasNext) {
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vTableReplicatedIter.next()
assert(!vTableReplicatedIter.hasNext)
assert(ePid == vPid)
val et = new EdgeTriplet[VD, ED]
val inputIterator = edgePartition.iterator.map { e =>
et.set(e)
et.srcAttr = vPart(e.srcId)
et.dstAttr = vPart(e.dstId)
et
}
// Apply the user function to the vertex partition
val outputIter = f(ePid, inputIterator)
// Consume the iterator to update the edge attributes
val newEdgePartition = edgePartition.map(outputIter)
Iterator((ePid, newEdgePartition))
} else {
if (ePartIter.hasNext != vTableReplicatedIter.hasNext) {
logError("mapTriplets: Dropped non-empty %s partition".format(
if (ePartIter.hasNext) "edge" else "ReplicatedVertexView"))
}
Iterator.empty
}
}
new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
}
Expand Down Expand Up @@ -216,50 +233,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (

// Map and combine.
val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
if (ePartIter.hasNext && vPartIter.hasNext) {
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
case Some(EdgeDirection.In) =>
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
case _ => // None
edgePartition.iterator
}

// Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
val mapOutputs = edgeIter.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vPart(e.srcId)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
if (mapUsesDstAttr) {
et.dstAttr = vPart(e.dstId)
}
case Some(EdgeDirection.In) =>
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
case _ => // None
edgePartition.iterator
}

// Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
val mapOutputs = edgeIter.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vPart(e.srcId)
mapFunc(et)
}
if (mapUsesDstAttr) {
et.dstAttr = vPart(e.dstId)
// Note: This doesn't allow users to send messages to arbitrary vertices.
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
} else {
if (ePartIter.hasNext != vPartIter.hasNext) {
logError("mapReduceTriplets: Dropped non-empty %s partition".format(
if (ePartIter.hasNext) "edge" else "vertex"))
}
mapFunc(et)
Iterator.empty
}
// Note: This doesn't allow users to send messages to arbitrary vertices.
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
}

// do the final reduction reusing the index map
Expand Down
Loading

0 comments on commit 354d154

Please sign in to comment.