Skip to content

Commit

Permalink
feat: CLIN-3202 Add renameCsvFile util function (#240)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurabegin authored Sep 23, 2024
1 parent 1f6fbe1 commit 2d96fb6
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package bio.ferlab.datalake.spark3.utils

import bio.ferlab.datalake.commons.config.Format.CSV
import bio.ferlab.datalake.commons.config.{Coalesce, Configuration, DatasetConf, FixedRepartition}
import bio.ferlab.datalake.commons.file.FileSystemResolver
import bio.ferlab.datalake.spark3.implicits.DatasetConfImplicits.DatasetConfOperations
import org.apache.spark.sql.{DataFrame, SparkSession}

object CsvUtils {

/**
* Renames the CSV output file if the destination format is CSV and data is repartitioned into a single file.
*
* When writing to CSV format, Spark adds the partition information to the filename. This function replaces the
* partition info with the table name. It also deletes the unnecessary `_SUCCESS` file created by Spark.
*
* @param mainDestination The mainDestination [[DatasetConf]] of the ETL
* @param suffix Optional, adds a suffix to the file name, before the extension
* @param spark An instance of [[SparkSession]]
* @param conf The ETL [[Configuration]]
* @return The renamed CSV loaded as a Dataframe
* @example
* This function would rename this CSV file :
* {{{
* published/nom_du_projet/nom_de_la_table/part-00000-3afd3298-a186-4289-8ba3-3bf55d27953f-c000.csv
* }}}
* to :
* {{{
* published/nom_du_projet/nom_de_la_table/nom_de_la_table_suffix.csv
* }}}
* where suffix could be : `v1_0_0`, `2020_01_01`, etc.
*/
def renameCsvFile(mainDestination: DatasetConf, suffix: Option[String] = None)
(implicit spark: SparkSession, conf: Configuration): DataFrame = {
val (format, repartition) = (mainDestination.format, mainDestination.repartition)
if (format == CSV && repartition.isDefined) {
if (repartition.get == FixedRepartition(1) || repartition.get == Coalesce(1)) {
val fs = FileSystemResolver.resolve(conf.getStorage(mainDestination.storageid).filesystem)
val files = fs.list(mainDestination.location, recursive = false)
val successFilePath = files
.filter(_.name == "_SUCCESS")
.head
.path
val csvFilePath = files
.filter(_.name.startsWith("part-"))
.head
.path

val newPath = mainDestination.location + "/" + mainDestination.path.split("/").last + suffix.map("_" + _).getOrElse("") + ".csv"

fs.move(csvFilePath, newPath, overwrite = true)
fs.remove(successFilePath)
}
}
mainDestination.read
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import bio.ferlab.datalake.commons.file.FileSystemType.S3
trait WithTestConfig {
val alias = "public_database"

private val sc: SimpleConfiguration = ConfigurationLoader.loadFromResources[SimpleConfiguration]("config/reference_kf.conf")
implicit val conf: SimpleConfiguration =
lazy val sc: SimpleConfiguration = ConfigurationLoader.loadFromResources[SimpleConfiguration]("config/reference_kf.conf")
implicit lazy val conf: SimpleConfiguration =
sc
.copy(datalake = sc.datalake.copy(storages = List(StorageConf(alias, getClass.getClassLoader.getResource(".").getFile, S3))))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package bio.ferlab.datalake.spark3.utils

import bio.ferlab.datalake.commons.config.{DatasetConf, FixedRepartition, SimpleConfiguration}
import bio.ferlab.datalake.commons.config.Format.{CSV, DELTA}
import bio.ferlab.datalake.commons.config.LoadType.{Insert, OverWrite, OverWritePartition}
import bio.ferlab.datalake.commons.file.FileSystemResolver
import bio.ferlab.datalake.spark3.loader.LoadResolver
import bio.ferlab.datalake.spark3.testutils.{AirportInput, WithTestConfig}
import bio.ferlab.datalake.testutils.SparkSpec
import org.apache.spark.sql.types.DateType

class CsvUtilsSpec extends SparkSpec with WithTestConfig {
import spark.implicits._

val destinationDs: DatasetConf = DatasetConf("destination", alias, "/destination", CSV, OverWrite, repartition = Some(FixedRepartition(1)), readoptions = Map("inferSchema" -> "true", "header" -> "true"), writeoptions = Map("header" -> "true"))
override lazy implicit val conf: SimpleConfiguration = sc.copy(datalake = sc.datalake.copy(sources = List(destinationDs)))

val inputData = Seq(
AirportInput("1", "YYC", "Calgary Int airport"),
AirportInput("2", "YUL", "Montreal Int airport")
)


"renameCsvFile" should "rename file if the format is CSV and data is repartitioned into a single file" in {
withOutputFolder("root") { root =>
val updatedConf = updateConfStorages(conf, root)

val fileName = "destination.csv"
LoadResolver
.write(spark, updatedConf)(destinationDs.format -> destinationDs.loadtype)
.apply(destinationDs, destinationDs.repartition.get.repartition(inputData.toDF()))

val resultDf = CsvUtils.renameCsvFile(destinationDs)(spark, updatedConf)
resultDf
.as[AirportInput]
.collect() should contain theSameElementsAs inputData

val files = FileSystemResolver
.resolve(updatedConf.getStorage(destinationDs.storageid).filesystem)
.list(destinationDs.location(updatedConf), recursive = true)

files.size shouldBe 1
files.head.name shouldBe fileName
}
}

"renameCsvFile" should "preserve the existing files in the destination if load type is insert" in {
withOutputFolder("root") { root =>
val updatedConf = updateConfStorages(conf, root)

val insertDestinationDs = destinationDs.copy(loadtype = Insert)
val fs = FileSystemResolver.resolve(updatedConf.getStorage(insertDestinationDs.storageid).filesystem)
val load = LoadResolver.write(spark, updatedConf)(insertDestinationDs.format -> insertDestinationDs.loadtype)

// Existing data
val existingData = inputData
load(insertDestinationDs, insertDestinationDs.repartition.get.repartition(existingData.toDF()))

val existingDf = CsvUtils.renameCsvFile(insertDestinationDs, suffix = Some("v1"))(spark, updatedConf)
existingDf
.as[AirportInput]
.collect() should contain theSameElementsAs existingData

val existingFiles = fs.list(insertDestinationDs.location(updatedConf), recursive = true)
existingFiles.size shouldBe 1
existingFiles.head.name shouldBe "destination_v1.csv"

// New data
val newData = Seq(
AirportInput("3", "YYZ", "Toronto Int airport"),
AirportInput("4", "YVR", "Vancouver Int airport")
)
load(insertDestinationDs, insertDestinationDs.repartition.get.repartition(newData.toDF()))

val newDf = CsvUtils.renameCsvFile(insertDestinationDs, suffix = Some("v2"))(spark, updatedConf)
newDf
.as[AirportInput]
.collect() should contain theSameElementsAs existingData ++ newData

val newFiles = fs.list(insertDestinationDs.location(updatedConf), recursive = true)
newFiles.size shouldBe 2
newFiles.count(_.name === "destination_v1.csv") shouldBe 1
newFiles.count(_.name === "destination_v2.csv") shouldBe 1
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package bio.ferlab.datalake.testutils

import bio.ferlab.datalake.commons.config.SimpleConfiguration
import org.apache.commons.io.FileUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -41,5 +42,18 @@ trait WithSparkSession {
FileUtils.deleteDirectory(output.toFile)
}
}

/**
* Change configuration storages paths. Can be used in association with `withOutputFolder` to replace all
* Configuration root paths with the temporary folder.
*
* @param conf ETL Configuration
* @param newPath Path used to replace storages paths
* @return Updated Configuration with new storages paths
*/
def updateConfStorages(conf: SimpleConfiguration, newPath: String): SimpleConfiguration = {
val updatedStorages = conf.storages.map { s => s.copy(path = newPath) }
conf.copy(datalake = conf.datalake.copy(storages = updatedStorages))
}
}

0 comments on commit 2d96fb6

Please sign in to comment.