Skip to content

Commit

Permalink
Wrap sqs sink with effects
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 5, 2023
1 parent cfdaa58 commit 94f723f
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 80 deletions.
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,12 @@ lazy val kinesisDistroless = project
.configs(IntegrationTest)

lazy val sqsSettings =
allSettings ++ buildInfoSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
moduleName := "snowplow-stream-collector-sqs",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
Docker / packageName := "scala-stream-collector-sqs",
libraryDependencies ++= Seq(
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.sqs,
Dependencies.Libraries.sts,
)
Expand All @@ -198,14 +200,14 @@ lazy val sqsSettings =
lazy val sqs = project
.settings(sqsSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val sqsDistroless = project
.in(file("distroless/sqs"))
.settings(sourceDirectory := (sqs / sourceDirectory).value)
.settings(sqsSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val pubsubSettings =
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
Expand Down
22 changes: 0 additions & 22 deletions sqs/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,3 @@ collector {
}
}
}

akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]

http.server {
remote-address-header = on
raw-request-uri-header = on

parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}

max-connections = 2048
}

coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,32 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.concurrent.ScheduledThreadPoolExecutor
import cats.syntax.either._
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService

object SqsCollector extends Collector {
def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion
import cats.effect.{IO, Resource}

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks: Either[Throwable, CollectorSinks] = for {
sqs <- collectorConf.streams.sink match {
case sqs: Sqs => sqs.asRight
case sink => new IllegalArgumentException(s"Configured sink $sink is not SQS.").asLeft
}
es = new ScheduledThreadPoolExecutor(sqs.threadPoolSize)
goodQueue = collectorConf.streams.good
badQueue = collectorConf.streams.bad
bufferConf = collectorConf.streams.buffer
good <- SqsSink.createAndInitialize(
sqs.maxBytes,
sqs,
bufferConf,
goodQueue,
es
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._

object SqsCollector extends App[SqsSinkConfig](BuildInfo) {

override def mkSinks(config: Config.Streams[SqsSinkConfig]): Resource[IO, Sinks[IO]] = {
val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize)
for {
good <- SqsSink.create[IO](
config.sink.maxBytes,
config.sink,
config.buffer,
config.good,
threadPoolExecutor
)
bad <- SqsSink.createAndInitialize(
sqs.maxBytes,
sqs,
bufferConf,
badQueue,
es
bad <- SqsSink.create[IO](
config.sink.maxBytes,
config.sink,
config.buffer,
config.bad,
threadPoolExecutor
)
} yield CollectorSinks(good, bad)

sinks match {
case Right(s) => run(collectorConf, akkaConf, s, telemetry)
case Left(e) => throw e
}
} yield Sinks(good, bad)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import cats.effect.{Resource, Sync}
import cats.implicits.catsSyntaxMonadErrorRethrow

import org.slf4j.LoggerFactory

import java.nio.ByteBuffer
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
Expand All @@ -35,18 +40,20 @@ import com.amazonaws.auth.{
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry}

import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collector.core.{Config, Sink}

class SqsSink private (
class SqsSink[F[_]: Sync] private (
val maxBytes: Int,
client: AmazonSQS,
sqsConfig: Sqs,
bufferConfig: BufferConfig,
sqsConfig: SqsSinkConfig,
bufferConfig: Config.Buffer,
queueName: String,
executorService: ScheduledExecutorService
) extends Sink {
) extends Sink[F] {
import SqsSink._

private lazy val log = LoggerFactory.getLogger(getClass())

private val ByteThreshold: Long = bufferConfig.byteLimit
private val RecordThreshold: Long = bufferConfig.recordLimit
private val TimeThreshold: Long = bufferConfig.timeLimit
Expand All @@ -62,10 +69,10 @@ class SqsSink private (
concurrent.ExecutionContext.fromExecutorService(executorService)

@volatile private var sqsHealthy: Boolean = false
override def isHealthy: Boolean = sqsHealthy
override def isHealthy: F[Boolean] = Sync[F].pure(sqsHealthy)

override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
events.foreach(e => EventStorage.store(e, key))
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
Sync[F].delay(events.foreach(e => EventStorage.store(e, key)))

object EventStorage {
private val storedEvents = ListBuffer.empty[Events]
Expand Down Expand Up @@ -281,18 +288,36 @@ object SqsSink {
// Details about why messages failed to be written to SQS.
final case class BatchResultErrorInfo(code: String, message: String)

def create[F[_]: Sync](
maxBytes: Int,
sqsConfig: SqsSinkConfig,
bufferConfig: Config.Buffer,
queueName: String,
executorService: ScheduledExecutorService
): Resource[F, SqsSink[F]] = {
val acquire =
Sync[F]
.delay(
createAndInitialize(maxBytes, sqsConfig, bufferConfig, queueName, executorService)
)
.rethrow
val release = (sink: SqsSink[F]) => Sync[F].delay(sink.shutdown())

Resource.make(acquire)(release)
}

/**
* Create an SqsSink and schedule a task to flush its EventStorage.
* Exists so that no threads can get a reference to the SqsSink
* during its construction.
*/
def createAndInitialize(
def createAndInitialize[F[_]: Sync](
maxBytes: Int,
sqsConfig: Sqs,
bufferConfig: BufferConfig,
sqsConfig: SqsSinkConfig,
bufferConfig: Config.Buffer,
queueName: String,
executorService: ScheduledExecutorService
): Either[Throwable, SqsSink] = {
): Either[Throwable, SqsSink[F]] = {
val client = for {
provider <- getProvider(sqsConfig.aws)
client <- createSqsClient(provider, sqsConfig.region)
Expand All @@ -307,7 +332,7 @@ object SqsSink {
}

/** Create an aws credentials provider through env variables and iam. */
private def getProvider(awsConfig: AWSConfig): Either[Throwable, AWSCredentialsProvider] = {
private def getProvider(awsConfig: SqsSinkConfig.AWSConfig): Either[Throwable, AWSCredentialsProvider] = {
def isDefault(key: String): Boolean = key == "default"
def isIam(key: String): Boolean = key == "iam"
def isEnv(key: String): Boolean = key == "env"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import io.circe.Decoder
import io.circe.generic.semiauto._

import com.snowplowanalytics.snowplow.collector.core.Config

final case class SqsSinkConfig(
maxBytes: Int,
region: String,
backoffPolicy: SqsSinkConfig.BackoffPolicyConfig,
aws: SqsSinkConfig.AWSConfig,
threadPoolSize: Int
) extends Config.Sink

object SqsSinkConfig {
final case class AWSConfig(accessKey: String, secretKey: String)

final case class BackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int)

implicit val configDecoder: Decoder[SqsSinkConfig] = deriveDecoder[SqsSinkConfig]
implicit val backoffPolicyDecoder: Decoder[BackoffPolicyConfig] = deriveDecoder[BackoffPolicyConfig]
implicit val awsConfigDecoder: Decoder[AWSConfig] = deriveDecoder[AWSConfig]

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,115 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec
import cats.effect.testing.specs2.CatsEffect
import cats.effect.{ExitCode, IO}
import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSinkConfig
import org.http4s.SameSite
import org.specs2.mutable.Specification

import java.nio.file.Paths
import scala.concurrent.duration.DurationInt

class SqsConfigSpec extends Specification with CatsEffect {

"Config parser" should {
"be able to parse extended kinesis config" in {
assert(
resource = "/config.sqs.extended.hocon",
expectedResult = Right(SqsConfigSpec.expectedConfig)
)
}
"be able to parse minimal kinesis config" in {
assert(
resource = "/config.sqs.minimal.hocon",
expectedResult = Right(SqsConfigSpec.expectedConfig)
)
}
}

private def assert(resource: String, expectedResult: Either[ExitCode, Config[SqsSinkConfig]]) = {
val path = Paths.get(getClass.getResource(resource).toURI)
ConfigParser.fromPath[IO, SqsSinkConfig](Some(path)).value.map { result =>
result must beEqualTo(expectedResult)
}
}
}

object SqsConfigSpec {

private val expectedConfig = Config[SqsSinkConfig](
interface = "0.0.0.0",
port = 8080,
paths = Map.empty[String, String],
p3p = Config.P3P(
policyRef = "/w3c/p3p.xml",
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
),
crossDomain = Config.CrossDomain(
enabled = false,
domains = List("*"),
secure = true
),
cookie = Config.Cookie(
enabled = true,
expiration = 365.days,
name = "sp",
domains = List.empty,
fallbackDomain = None,
secure = true,
httpOnly = true,
sameSite = Some(SameSite.None)
),
doNotTrackCookie = Config.DoNotTrackCookie(
enabled = false,
name = "",
value = ""
),
cookieBounce = Config.CookieBounce(
enabled = false,
name = "n3pc",
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000",
forwardedProtocolHeader = None
),
redirectMacro = Config.RedirectMacro(
enabled = false,
placeholder = None
),
rootResponse = Config.RootResponse(
enabled = false,
statusCode = 302,
headers = Map.empty[String, String],
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
streams = Config.Streams(
good = "good",
bad = "bad",
useIpAddressAsPartitionKey = false,
buffer = Config.Buffer(
byteLimit = 3145728,
recordLimit = 500,
timeLimit = 5000
),
sink = SqsSinkConfig(
maxBytes = 192000,
region = "eu-central-1",
backoffPolicy = SqsSinkConfig.BackoffPolicyConfig(
minBackoff = 500,
maxBackoff = 1500,
maxRetries = 3
),
aws = SqsSinkConfig.AWSConfig("iam", "iam"),
threadPoolSize = 10
)
)
)

class SqsConfigSpec extends ConfigSpec {
makeConfigTest("sqs", "", "")
}

0 comments on commit 94f723f

Please sign in to comment.