Skip to content

Latest commit

 

History

History
167 lines (104 loc) · 8.08 KB

spark-rdd-ShuffleDependency.adoc

File metadata and controls

167 lines (104 loc) · 8.08 KB

ShuffleDependency — Shuffle Dependency

ShuffleDependency is a RDD Dependency on the output of a ShuffleMapStage for a key-value pair RDD.

ShuffleDependency uses the RDD to know the number of (map-side/pre-shuffle) partitions and the Partitioner for the number of (reduce-size/post-shuffle) partitions.

ShuffleDependency is a dependency of ShuffledRDD as well as CoGroupedRDD and SubtractedRDD but only when partitioners (of the RDD’s and after transformations) are different.

A ShuffleDependency is created for a key-value pair RDD, i.e. RDD[Product2[K, V]] with K and V being the types of keys and values, respectively.

Tip
Use dependencies method on an RDD to know the dependencies.
scala> val rdd = sc.parallelize(0 to 8).groupBy(_ % 3)
rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:24

scala> rdd.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@454f6cc5)

Every ShuffleDependency has a unique application-wide shuffleId number that is assigned when ShuffleDependency is created (and is used throughout Spark’s code to reference a ShuffleDependency).

Note
Shuffle ids are tracked by SparkContext.

keyOrdering Property

Caution
FIXME

serializer Property

Caution
FIXME

Creating ShuffleDependency Instance

ShuffleDependency takes the following when created:

  1. A single key-value pair RDD, i.e. RDD[Product2[K, V]],

  2. Partitioner (available as partitioner property),

  3. Serializer,

  4. Optional key ordering (of Scala’s scala.math.Ordering type),

  5. Optional Aggregator,

  6. mapSideCombine flag which is disabled (i.e. false) by default.

Note
ShuffleDependency uses SparkEnv to access the current Serializer.

When created, ShuffleDependency gets shuffle id (as shuffleId).

Note
ShuffleDependency uses the input RDD to access SparkContext and so the shuffleId.

ShuffleDependency registers itself with ShuffleManager and gets a ShuffleHandle (available as shuffleHandle property).

Note
ShuffleDependency accesses ShuffleManager using SparkEnv.

In the end, ShuffleDependency registers itself for cleanup with ContextCleaner.

Note
ShuffleDependency accesses the optional ContextCleaner through SparkContext.
Note
ShuffleDependency is created when ShuffledRDD, CoGroupedRDD, and SubtractedRDD return their RDD dependencies.

partitioner Property

partitioner property is a Partitioner that is used to partition the shuffle output.

partitioner is specified when ShuffleDependency is created.

shuffleHandle Property

shuffleHandle: ShuffleHandle

shuffleHandle is the ShuffleHandle of a ShuffleDependency as assigned eagerly when ShuffleDependency was created.

Note
shuffleHandle is used to compute CoGroupedRDDs, ShuffledRDD, SubtractedRDD, and ShuffledRowRDD (to get a ShuffleReader for a ShuffleDependency) and when a ShuffleMapTask runs (to get a ShuffleWriter for a ShuffleDependency).

Map-Size Combine Flag — mapSideCombine Attribute

mapSideCombine is a flag to control whether to use partial aggregation (aka map-side combine).

mapSideCombine is by default disabled (i.e. false) when creating a ShuffleDependency.

When enabled, SortShuffleWriter and BlockStoreShuffleReader assume that an Aggregator is also defined.

Note
mapSideCombine is exclusively set (and hence can be enabled) when ShuffledRDD returns the dependencies (which is a single ShuffleDependency).

aggregator Property

aggregator: Option[Aggregator[K, V, C]] = None

aggregator is a map/reduce-side Aggregator (for a RDD’s shuffle).

aggregator is by default undefined (i.e. None) when ShuffleDependency is created.

Usage

The places where ShuffleDependency is used:

The RDD operations that may or may not use the above RDDs and hence shuffling:

  • coalesce

  • cogroup

    • intersection

  • subtractByKey

    • subtract

  • sortByKey

    • sortBy

  • repartitionAndSortWithinPartitions

  • combineByKeyWithClassTag

    • combineByKey

    • aggregateByKey

    • foldByKey

    • reduceByKey

    • countApproxDistinctByKey

    • groupByKey

  • partitionBy

Note
There may be other dependent methods that use the above.