Skip to content

Commit

Permalink
✨ (feat) Filter on all specified partitions DeltaTableOptimization
Browse files Browse the repository at this point in the history
  • Loading branch information
zoemcl committed Sep 12, 2024
1 parent a821576 commit 50f1285
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package bio.ferlab.datalake.spark3.etl.v4
package bio.ferlab.datalake.spark3.utils

import bio.ferlab.datalake.commons.config.{Configuration, DatasetConf}
import bio.ferlab.datalake.spark3.implicits.DatasetConfImplicits.DatasetConfOperations
Expand All @@ -10,13 +10,21 @@ trait DeltaTableOptimization {
def optimize(ds: DatasetConf, numberOfVersions: Int)(implicit conf: Configuration, spark: SparkSession): Unit = {
val df = ds.read

if(ds.partitionby.isEmpty){
// compact latest partition. If no partition specified, compact all.
if (ds.partitionby.isEmpty) {
compact(ds)
} else { // elif length is 1
} else if (ds.partitionby.length == 1) { // create partition filter based on single partition column
val maxPartition = df.select(max(col(ds.partitionby.head))).collect().head.get(0)
val partitionFilter = s"${ds.partitionby}='${maxPartition.toString}'"
compact(ds, Some(partitionFilter))
} // else filter on all of them, get max of each, do not read entire table
} else { // create partition filter based on all partition columns
val maxPartitions = ds.partitionby.map(p => {
df.select(max(col(p))).collect().head.get(0)
}).zip(ds.partitionby)
val partitionFilters = maxPartitions.map(m => s"${m._2}=${m._1}")
val finalPartitionFilter = partitionFilters.mkString(" AND ")
compact(ds, Some(finalPartitionFilter))
}

vacuum(ds, numberOfVersions)
}
Expand Down

0 comments on commit 50f1285

Please sign in to comment.