Skip to content

Commit

Permalink
S3a support
Browse files Browse the repository at this point in the history
  • Loading branch information
jozefbakus committed Oct 5, 2023
1 parent f84055c commit 59a9cbb
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 37 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ In every run mode, the tool offers following universal features:
- Backup is deleted after a successful run(can be overridden to keep the backup)
- Currently supported file systems:
- S3
- S3a
- Unix
- HDFS

Expand Down Expand Up @@ -142,6 +143,12 @@ To be able to perform any operation on S3 you must provide AWS credentials. The
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. The application will read them automatically. For more information, as well as other
ways to provide credentials, see [Using credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html)

### S3 Set up
To be able to perform any operation on S3a you must provide AWS credentials. The easiest way to do so is to set environment variables
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. The application will read them automatically. For more information, as well as other
ways to provide credentials, see [Using credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html)
Additionally to set provider endpoint, environment variable `AWS_ENDPOINT_URL` has to be set

### HDFS Set up
To be able to perform any operation on HDFS you must set environment variable `HADOOP_CONF_DIR`.

Expand Down
32 changes: 12 additions & 20 deletions src/main/scala/za/co/absa/spark_metadata_tool/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,9 @@ import org.log4s.Logger
import software.amazon.awssdk.services.s3.S3Client
import za.co.absa.spark_metadata_tool.LoggingImplicits._
import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager}
import za.co.absa.spark_metadata_tool.model.{
AppConfig,
AppError,
AppErrorWithThrowable,
CompareMetadataWithData,
CreateMetadata,
FixPaths,
Hdfs,
InitializationError,
Merge,
NotFoundError,
S3,
SinkFileStatus,
TargetFilesystem,
Unix,
UnknownError
}
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError}

import java.net.URI
import scala.util.Try
import scala.util.chaining._

Expand Down Expand Up @@ -223,8 +208,14 @@ object Application extends App {
} yield ()).tap(_.logInfo(s"Done processing file ${path.toString}"))

def initS3(): Either[AppError, S3Client] = Try {
S3Client.builder().build()
}.toEither.leftMap(err => InitializationError("Failed to initialize S3 Client", err.some))
//This is done because aws sdk does not support overriding aws endpoint url via env variable:
//https://docs.aws.amazon.com/sdkref/latest/guide/settings-reference.html#EVarSettings
val endpoint = System.getenv("AWS_ENDPOINT_URL")
val builder = S3Client.builder()
if (endpoint.nonEmpty) builder.endpointOverride(new URI(endpoint))

builder.build()
}.toEither.leftMap(err => InitializationError("Failed to initialize S3A Client", err.some))

def initHdfs(): Either[AppError, FileSystem] = Try {
val hadoopConfDir = sys.env("HADOOP_CONF_DIR")
Expand All @@ -242,7 +233,8 @@ object Application extends App {
(fs match {
case Unix => UnixFileManager.asRight
case Hdfs => initHdfs().map(hdfs => HdfsFileManager(hdfs))
case S3 => initS3().map(client => S3FileManager(client))
case S3 => initS3().map(client => S3FileManager(client, "s3"))
case S3a => initS3().map(client => S3FileManager(client, "s3a"))
}).tap(fm => logger.debug(s"Initialized file manager : $fm"))

}
15 changes: 2 additions & 13 deletions src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,7 @@ import org.apache.log4j.PatternLayout
import org.log4s.Logger
import scopt.OParser
import za.co.absa.spark_metadata_tool.LoggingImplicits._
import za.co.absa.spark_metadata_tool.model.AppConfig
import za.co.absa.spark_metadata_tool.model.AppError
import za.co.absa.spark_metadata_tool.model.CompareMetadataWithData
import za.co.absa.spark_metadata_tool.model.FixPaths
import za.co.absa.spark_metadata_tool.model.Hdfs
import za.co.absa.spark_metadata_tool.model.InitializationError
import za.co.absa.spark_metadata_tool.model.Merge
import za.co.absa.spark_metadata_tool.model.ParsingError
import za.co.absa.spark_metadata_tool.model.S3
import za.co.absa.spark_metadata_tool.model.TargetFilesystem
import za.co.absa.spark_metadata_tool.model.Unix
import za.co.absa.spark_metadata_tool.model.UnknownFileSystemError
import za.co.absa.spark_metadata_tool.model.CreateMetadata
import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError}

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -182,6 +170,7 @@ object ArgumentParser {
case _ if path.startsWith("/") => Unix.asRight
case _ if path.startsWith("hdfs://") => Hdfs.asRight
case _ if path.startsWith("s3://") => S3.asRight
case _ if path.startsWith("s3a://") => S3a.asRight
case _ =>
UnknownFileSystemError(
s"Couldn't extract filesystem from path $path"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import scala.util.Try
import scala.util.Using
import scala.util.chaining._

case class S3FileManager(s3: S3Client) extends FileManager {
case class S3FileManager(s3: S3Client, schema: String) extends FileManager {
import S3FileManager._
implicit private val logger: Logger = org.log4s.getLogger

Expand Down Expand Up @@ -124,7 +124,7 @@ case class S3FileManager(s3: S3Client) extends FileManager {
val bucket = getBucket(baseDir)
val prefix = ensureTrailingSlash(getKey(baseDir))

val builder = new URIBuilder().setScheme("s3").setHost(bucket)
val builder = new URIBuilder().setScheme(schema).setHost(bucket)

val request = ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()

Expand Down Expand Up @@ -171,7 +171,7 @@ case class S3FileManager(s3: S3Client) extends FileManager {

private def listBucket(path: Path, filter: FileType): Either[IoError, Seq[Path]] = Try {
val bucket = getBucket(path)
val pathPrefix = s"s3://$bucket/"
val pathPrefix = s"$schema://$bucket/"
val rootKey = path.toString.stripPrefix(pathPrefix)

val req = ListObjectsV2Request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ case object Hdfs extends TargetFilesystem {
case object S3 extends TargetFilesystem {
override def pathPrefix: String = "s3://"
}
case object S3a extends TargetFilesystem {
override def pathPrefix: String = "s3a://"
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class S3FileManagerSpec extends AnyFlatSpec with Matchers with OptionValues with

private val s3 = mock[S3Client]

private val io = S3FileManager(s3)
private val io = S3FileManager(s3, "s3")

private val TMinus10 = Instant.now().minus(Duration.ofMinutes(10))

Expand Down

0 comments on commit 59a9cbb

Please sign in to comment.