Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageRank causes java.util.NoSuchElementException #52

Open
dcrankshaw opened this issue Nov 7, 2013 · 7 comments
Open

PageRank causes java.util.NoSuchElementException #52

dcrankshaw opened this issue Nov 7, 2013 · 7 comments
Labels

Comments

@dcrankshaw
Copy link
Contributor

When running PageRank on a cluster, sometimes I hit a NoSuchElementException that's caused somewhere in VertexSetRDD. Full stack trace and command below. The line numbers may be slightly off due to debugging printlns.

Command:

/root/graphx/run-example org.apache.spark.graph.Analytics spark://ec2-54-224-159-106.compute-1.amazonaws.com:7077 pagerank hdfs://ec2-54-224-159-106.compute-1.amazonaws.com:9000/soc-LiveJournal1.txt --numIter=10 --numEPart=128

Stack Trace:

java.util.NoSuchElementException: End of stream
    at org.apache.spark.util.NextIterator.next(NextIterator.scala:83)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:314)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:64)
    at org.apache.spark.graph.VertexSetRDD.compute(VertexSetRDD.scala:149)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

@ghost ghost assigned jegonzal Nov 7, 2013
@dcrankshaw
Copy link
Contributor Author

Interestingly, #51 seems to fix the issue.

@rxin
Copy link
Member

rxin commented Nov 7, 2013

simple code is less buggy :)

@jegonzal
Copy link
Contributor

jegonzal commented Nov 7, 2013

Hmm, is that error in the master branch? Those line numbers don't seem to "line-up." I guess for now maybe it makes sense to merge #51 since it is simpler.

@ccsevers
Copy link

ccsevers commented Jan 9, 2014

I'm running into this issue in the latest version built off of #132

The code I'm running is pretty simple:

    val input = sc.sequenceFile[VectorWritable, VectorWritable](inputPath, classOf[VectorWritable], classOf[VectorWritable])
// not even parsing the vectors, just making some big graph
val edges = input.flatMap {
      case (vec1, vec2) =>
        Seq(Edge(Random.nextLong, Random.nextLong, 1), Edge(Random.nextLong(),Random.nextLong(), 1))
    }
 val g = Graph.fromEdges(edges, 1)
 val cc = ConnectedComponents.run(g)
 cc.vertices.count()

The error I see is this:
4/01/09 10:38:46 WARN scheduler.TaskSetManager: Lost TID 400 (task 4.0:0)
14/01/09 10:38:46 WARN scheduler.TaskSetManager: Loss was due to java.util.NoSuchElementException
java.util.NoSuchElementException: End of stream
at org.apache.spark.util.NextIterator.next(NextIterator.scala:83)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:34)
at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:33)
at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449)
at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:242)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

This is running on a Hadoop 2.2.0 cluster with YARN 2.2.0. I've previously run against the same data set with Bagel and it works great.

@ankurdave ankurdave reopened this Jan 9, 2014
@ccsevers
Copy link

ccsevers commented Jan 9, 2014

Just to make it simpler you can change the input to something like:
val input = sc.parallelize(1 to 100000000, 400)
and change the flatMap input appropriately and see the same error.

edit: Also, oddly I can run it with say 11 splits on 10 works and it goes through. I see the same End of Stream error in the logs but it just fails that tasks and keeps going.

@ankurdave
Copy link
Member

Thanks for the report. I haven't yet been able to reproduce this. How many cores does Spark have in your configuration?

@ccsevers
Copy link

@ankurdave I've tried it with 4-8 cores. I can try with 1 if you think it would help pin down what's going on.

edit: Just to be clear, I mean 10-20 worker nodes with 4-8 cores each. (the machines have 24 cores each though).

@ankurdave ankurdave reopened this Jan 18, 2014
ankurdave added a commit to ankurdave/spark that referenced this issue Apr 9, 2014
Empty edge partitions sometimes appear in the output of zipPartitions
for unknown reasons, causing calls to Iterator#next to fail. This commit
checks these cases, handles them by returning an empty iterator, and
logs an error if this would cause GraphX to drop a corresponding
non-empty partition.

Resolves amplab/graphx#52.
@ankurdave ankurdave reopened this Apr 14, 2014
ankurdave added a commit to amplab/graphx2 that referenced this issue Apr 22, 2014
Empty edge partitions sometimes appear in the output of zipPartitions
for unknown reasons, causing calls to Iterator#next to fail. This commit
checks these cases, handles them by returning an empty iterator, and
logs an error if this would cause GraphX to drop a corresponding
non-empty partition.

Resolves amplab/graphx#52.

(cherry picked from commit 2265c87c387979c94275e673a16527f582b2f38a)
ankurdave added a commit to ankurdave/spark that referenced this issue Apr 25, 2014
Empty edge partitions sometimes appear in the output of zipPartitions
for unknown reasons, causing calls to Iterator#next to fail. This commit
checks these cases, handles them by returning an empty iterator, and
logs an error if this would cause GraphX to drop a corresponding
non-empty partition.

Resolves amplab/graphx#52.

(cherry picked from 7402177)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants