From 94f723f507fbd1677fdd0d74f1f55182319bdba1 Mon Sep 17 00:00:00 2001 From: spenes Date: Wed, 6 Sep 2023 01:57:29 +0300 Subject: [PATCH] Wrap sqs sink with effects --- build.sbt | 8 +- sqs/src/main/resources/application.conf | 22 ---- .../SqsCollector.scala | 62 ++++------ .../sinks/SqsSink.scala | 51 ++++++-- .../sinks/SqsSinkConfig.scala | 25 ++++ .../SqsConfigSpec.scala | 113 +++++++++++++++++- 6 files changed, 201 insertions(+), 80 deletions(-) create mode 100644 sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala diff --git a/build.sbt b/build.sbt index 6165766b7..d291d7ba4 100644 --- a/build.sbt +++ b/build.sbt @@ -186,10 +186,12 @@ lazy val kinesisDistroless = project .configs(IntegrationTest) lazy val sqsSettings = - allSettings ++ buildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-sqs", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", Docker / packageName := "scala-stream-collector-sqs", libraryDependencies ++= Seq( + Dependencies.Libraries.catsRetry, Dependencies.Libraries.sqs, Dependencies.Libraries.sts, ) @@ -198,14 +200,14 @@ lazy val sqsSettings = lazy val sqs = project .settings(sqsSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val sqsDistroless = project .in(file("distroless/sqs")) .settings(sourceDirectory := (sqs / sourceDirectory).value) .settings(sqsSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val pubsubSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( diff --git a/sqs/src/main/resources/application.conf b/sqs/src/main/resources/application.conf index 0c6651bd5..fc5230845 100644 --- a/sqs/src/main/resources/application.conf +++ b/sqs/src/main/resources/application.conf @@ -27,25 +27,3 @@ collector { } } } - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index 53c964c40..806fc93e6 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -15,48 +15,32 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import java.util.concurrent.ScheduledThreadPoolExecutor -import cats.syntax.either._ -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService -object SqsCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion +import cats.effect.{IO, Resource} - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks: Either[Throwable, CollectorSinks] = for { - sqs <- collectorConf.streams.sink match { - case sqs: Sqs => sqs.asRight - case sink => new IllegalArgumentException(s"Configured sink $sink is not SQS.").asLeft - } - es = new ScheduledThreadPoolExecutor(sqs.threadPoolSize) - goodQueue = collectorConf.streams.good - badQueue = collectorConf.streams.bad - bufferConf = collectorConf.streams.buffer - good <- SqsSink.createAndInitialize( - sqs.maxBytes, - sqs, - bufferConf, - goodQueue, - es +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ + +object SqsCollector extends App[SqsSinkConfig](BuildInfo) { + + override def mkSinks(config: Config.Streams[SqsSinkConfig]): Resource[IO, Sinks[IO]] = { + val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize) + for { + good <- SqsSink.create[IO]( + config.sink.maxBytes, + config.sink, + config.buffer, + config.good, + threadPoolExecutor ) - bad <- SqsSink.createAndInitialize( - sqs.maxBytes, - sqs, - bufferConf, - badQueue, - es + bad <- SqsSink.create[IO]( + config.sink.maxBytes, + config.sink, + config.buffer, + config.bad, + threadPoolExecutor ) - } yield CollectorSinks(good, bad) - - sinks match { - case Right(s) => run(collectorConf, akkaConf, s, telemetry) - case Left(e) => throw e - } + } yield Sinks(good, bad) } } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index b3e388ad8..3577ea2c7 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -12,6 +12,11 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks +import cats.effect.{Resource, Sync} +import cats.implicits.catsSyntaxMonadErrorRethrow + +import org.slf4j.LoggerFactory + import java.nio.ByteBuffer import java.util.UUID import java.util.concurrent.ScheduledExecutorService @@ -35,18 +40,20 @@ import com.amazonaws.auth.{ import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} import com.amazonaws.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry} -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} -class SqsSink private ( +class SqsSink[F[_]: Sync] private ( val maxBytes: Int, client: AmazonSQS, - sqsConfig: Sqs, - bufferConfig: BufferConfig, + sqsConfig: SqsSinkConfig, + bufferConfig: Config.Buffer, queueName: String, executorService: ScheduledExecutorService -) extends Sink { +) extends Sink[F] { import SqsSink._ + private lazy val log = LoggerFactory.getLogger(getClass()) + private val ByteThreshold: Long = bufferConfig.byteLimit private val RecordThreshold: Long = bufferConfig.recordLimit private val TimeThreshold: Long = bufferConfig.timeLimit @@ -62,10 +69,10 @@ class SqsSink private ( concurrent.ExecutionContext.fromExecutorService(executorService) @volatile private var sqsHealthy: Boolean = false - override def isHealthy: Boolean = sqsHealthy + override def isHealthy: F[Boolean] = Sync[F].pure(sqsHealthy) - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - events.foreach(e => EventStorage.store(e, key)) + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + Sync[F].delay(events.foreach(e => EventStorage.store(e, key))) object EventStorage { private val storedEvents = ListBuffer.empty[Events] @@ -281,18 +288,36 @@ object SqsSink { // Details about why messages failed to be written to SQS. final case class BatchResultErrorInfo(code: String, message: String) + def create[F[_]: Sync]( + maxBytes: Int, + sqsConfig: SqsSinkConfig, + bufferConfig: Config.Buffer, + queueName: String, + executorService: ScheduledExecutorService + ): Resource[F, SqsSink[F]] = { + val acquire = + Sync[F] + .delay( + createAndInitialize(maxBytes, sqsConfig, bufferConfig, queueName, executorService) + ) + .rethrow + val release = (sink: SqsSink[F]) => Sync[F].delay(sink.shutdown()) + + Resource.make(acquire)(release) + } + /** * Create an SqsSink and schedule a task to flush its EventStorage. * Exists so that no threads can get a reference to the SqsSink * during its construction. */ - def createAndInitialize( + def createAndInitialize[F[_]: Sync]( maxBytes: Int, - sqsConfig: Sqs, - bufferConfig: BufferConfig, + sqsConfig: SqsSinkConfig, + bufferConfig: Config.Buffer, queueName: String, executorService: ScheduledExecutorService - ): Either[Throwable, SqsSink] = { + ): Either[Throwable, SqsSink[F]] = { val client = for { provider <- getProvider(sqsConfig.aws) client <- createSqsClient(provider, sqsConfig.region) @@ -307,7 +332,7 @@ object SqsSink { } /** Create an aws credentials provider through env variables and iam. */ - private def getProvider(awsConfig: AWSConfig): Either[Throwable, AWSCredentialsProvider] = { + private def getProvider(awsConfig: SqsSinkConfig.AWSConfig): Either[Throwable, AWSCredentialsProvider] = { def isDefault(key: String): Boolean = key == "default" def isIam(key: String): Boolean = key == "iam" def isEnv(key: String): Boolean = key == "env" diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala new file mode 100644 index 000000000..b3cbeba30 --- /dev/null +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala @@ -0,0 +1,25 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import io.circe.Decoder +import io.circe.generic.semiauto._ + +import com.snowplowanalytics.snowplow.collector.core.Config + +final case class SqsSinkConfig( + maxBytes: Int, + region: String, + backoffPolicy: SqsSinkConfig.BackoffPolicyConfig, + aws: SqsSinkConfig.AWSConfig, + threadPoolSize: Int +) extends Config.Sink + +object SqsSinkConfig { + final case class AWSConfig(accessKey: String, secretKey: String) + + final case class BackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int) + + implicit val configDecoder: Decoder[SqsSinkConfig] = deriveDecoder[SqsSinkConfig] + implicit val backoffPolicyDecoder: Decoder[BackoffPolicyConfig] = deriveDecoder[BackoffPolicyConfig] + implicit val awsConfigDecoder: Decoder[AWSConfig] = deriveDecoder[AWSConfig] + +} diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index 690c63d44..cd83f57dd 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -18,8 +18,115 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification + +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class SqsConfigSpec extends Specification with CatsEffect { + + "Config parser" should { + "be able to parse extended kinesis config" in { + assert( + resource = "/config.sqs.extended.hocon", + expectedResult = Right(SqsConfigSpec.expectedConfig) + ) + } + "be able to parse minimal kinesis config" in { + assert( + resource = "/config.sqs.minimal.hocon", + expectedResult = Right(SqsConfigSpec.expectedConfig) + ) + } + } + + private def assert(resource: String, expectedResult: Either[ExitCode, Config[SqsSinkConfig]]) = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, SqsSinkConfig](Some(path)).value.map { result => + result must beEqualTo(expectedResult) + } + } +} + +object SqsConfigSpec { + + private val expectedConfig = Config[SqsSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + sink = SqsSinkConfig( + maxBytes = 192000, + region = "eu-central-1", + backoffPolicy = SqsSinkConfig.BackoffPolicyConfig( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + aws = SqsSinkConfig.AWSConfig("iam", "iam"), + threadPoolSize = 10 + ) + ) + ) -class SqsConfigSpec extends ConfigSpec { - makeConfigTest("sqs", "", "") }