Skip to content

Commit

Permalink
Wrap kinesis sink with effects
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Aug 24, 2023
1 parent cfdaa58 commit 85723b9
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 220 deletions.
12 changes: 7 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,18 @@ lazy val http4s = project
.configs(IntegrationTest)

lazy val kinesisSettings =
allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
moduleName := "snowplow-stream-collector-kinesis",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
Docker / packageName := "scala-stream-collector-kinesis",
libraryDependencies ++= Seq(
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.kinesis,
Dependencies.Libraries.sts,
Dependencies.Libraries.sqs,
// integration tests dependencies
Dependencies.Libraries.LegacyIT.specs2,
Dependencies.Libraries.LegacyIT.specs2CE
Dependencies.Libraries.IT.specs2,
Dependencies.Libraries.IT.specs2CE,
),
IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value,
IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated
Expand All @@ -174,15 +176,15 @@ lazy val kinesisSettings =
lazy val kinesis = project
.settings(kinesisSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile;it->it")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)

lazy val kinesisDistroless = project
.in(file("distroless/kinesis"))
.settings(sourceDirectory := (kinesis / sourceDirectory).value)
.settings(kinesisSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile;it->it")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)

lazy val sqsSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,5 @@ object Config {
implicit val ssl = deriveDecoder[SSL]
deriveDecoder[Config[SinkConfig]]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.core

import scala.concurrent.duration._

import cats.effect.testing.specs2.CatsIO

import cats.effect.IO
import cats.effect.testing.specs2.CatsEffect
import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http}
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import org.http4s.{Header, SameSite}

import org.specs2.mutable.Specification
import org.typelevel.ci.CIStringSyntax

import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import scala.concurrent.duration._

class CookieSpec extends Specification with Localstack with CatsIO {
class CookieSpec extends Specification with Localstack with CatsEffect {

override protected val Timeout = 5.minutes

Expand Down Expand Up @@ -65,14 +62,14 @@ class CookieSpec extends Specification with Localstack with CatsIO {

for {
resp <- Http.response(request)
now <- ioTimer.clock.realTime(SECONDS)
now <- IO.realTime
} yield {
resp.cookies match {
case List(cookie) =>
cookie.name must beEqualTo(name)
cookie.expires match {
case Some(expiry) =>
expiry.epochSecond should beCloseTo(now + expiration.toSeconds, 10)
expiry.epochSecond should beCloseTo((now + expiration).toSeconds, 10)
case None =>
ko(s"Cookie [$cookie] doesn't contain the expiry date")
}
Expand Down Expand Up @@ -134,7 +131,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
additionalConfig = Some(mkConfig())
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(Header("SP-Anonymous", "*"))
.withHeaders(Header.Raw(ci"SP-Anonymous", "*"))

for {
resp <- Http.response(request)
Expand All @@ -157,7 +154,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
additionalConfig = Some(mkConfig())
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(Header("Origin", "http://my.domain"))
.withHeaders(Header.Raw(ci"Origin", "http://my.domain"))

for {
resp <- Http.response(request)
Expand Down Expand Up @@ -187,7 +184,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
))
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(Header("Origin", s"http://$subDomain"))
.withHeaders(Header.Raw(ci"Origin", s"http://$subDomain"))

for {
resp <- Http.response(request)
Expand Down Expand Up @@ -220,7 +217,7 @@ class CookieSpec extends Specification with Localstack with CatsIO {
))
).use { collector =>
val request1 = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(Header("Origin", s"http://other.domain"))
.withHeaders(Header.Raw(ci"Origin", s"http://other.domain"))
val request2 = EventGenerator.mkTp2Event(collector.host, collector.port)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,17 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.core

import scala.concurrent.duration._

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import cats.effect.testing.specs2.CatsEffect
import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import org.http4s.{Method, Request, Uri}
import org.specs2.mutable.Specification

import org.http4s.{Request, Method, Uri}

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import scala.concurrent.duration._

class CustomPathsSpec extends Specification with Localstack with CatsIO {
class CustomPathsSpec extends Specification with Localstack with CatsEffect {

override protected val Timeout = 5.minutes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,17 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.core

import scala.concurrent.duration._
import scala.collection.JavaConverters._

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import cats.effect.testing.specs2.CatsEffect
import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http}
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import org.specs2.mutable.Specification

import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import scala.collection.JavaConverters._
import scala.concurrent.duration._

class DoNotTrackCookieSpec extends Specification with Localstack with CatsIO {
class DoNotTrackCookieSpec extends Specification with Localstack with CatsEffect {

override protected val Timeout = 5.minutes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,17 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.core

import scala.concurrent.duration._

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import cats.effect.testing.specs2.CatsEffect
import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import org.http4s.{Method, Request, Uri}
import org.specs2.mutable.Specification

import org.http4s.{Request, Method, Uri}

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import scala.concurrent.duration._

class HealthEndpointSpec extends Specification with Localstack with CatsIO {
class HealthEndpointSpec extends Specification with Localstack with CatsEffect {

override protected val Timeout = 5.minutes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,19 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.core

import java.net.InetAddress

import scala.concurrent.duration._

import cats.data.NonEmptyList

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import org.specs2.mutable.Specification

import cats.effect.testing.specs2.CatsEffect
import com.comcast.ip4s.IpAddress
import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http}
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import org.http4s.headers.`X-Forwarded-For`
import org.specs2.mutable.Specification

import com.snowplowanalytics.snowplow.collectors.scalastream.it.Http
import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.Kinesis
import scala.concurrent.duration._

class XForwardedForSpec extends Specification with Localstack with CatsIO {
class XForwardedForSpec extends Specification with Localstack with CatsEffect {

override protected val Timeout = 5.minutes

Expand All @@ -44,7 +36,7 @@ class XForwardedForSpec extends Specification with Localstack with CatsIO {
val streamGood = s"${testName}-raw"
val streamBad = s"${testName}-bad-1"

val ip = InetAddress.getByName("123.123.123.123")
val ip = IpAddress.fromString("123.123.123.123")

Collector.container(
"kinesis/src/it/resources/collector.hocon",
Expand All @@ -53,7 +45,7 @@ class XForwardedForSpec extends Specification with Localstack with CatsIO {
streamBad
).use { collector =>
val request = EventGenerator.mkTp2Event(collector.host, collector.port)
.withHeaders(`X-Forwarded-For`(NonEmptyList.one(Some(ip))))
.withHeaders(`X-Forwarded-For`(NonEmptyList.one(ip)))

for {
_ <- Http.status(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,18 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis

import scala.concurrent.duration._

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import org.http4s.{Request, Method, Uri, Status}

import org.specs2.mutable.Specification

import org.testcontainers.containers.GenericContainer

import cats.effect.testing.specs2.CatsEffect
import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http}
import org.http4s.{Method, Request, Status, Uri}
import org.specs2.mutable.Specification
import org.testcontainers.containers.GenericContainer

import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers._
import scala.concurrent.duration._

class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
class KinesisCollectorSpec extends Specification with Localstack with CatsEffect {

override protected val Timeout = 5.minutes

Expand All @@ -46,7 +40,7 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
s"${testName}-raw",
s"${testName}-bad-1"
).use { collector =>
IO(collector.container.getLogs() must contain(("REST interface bound to")))
IO(collector.container.getLogs() must contain(("Service bound to address")))
}
}

Expand Down Expand Up @@ -99,7 +93,7 @@ class KinesisCollectorSpec extends Specification with Localstack with CatsIO {
_ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout)
} yield {
container.isRunning() must beFalse
container.getLogs() must contain("Server terminated")
container.getLogs() must contain("Closing NIO1 channel")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containers

import org.testcontainers.containers.BindMode
import org.testcontainers.containers.wait.strategy.Wait

import com.dimafeng.testcontainers.GenericContainer

import cats.effect.{IO, Resource}

import com.snowplowanalytics.snowplow.collectors.scalastream.generated.ProjectMetadata

import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.dimafeng.testcontainers.GenericContainer
import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.wait.strategy.Wait

object Collector {

Expand All @@ -40,7 +36,7 @@ object Collector {
additionalConfig: Option[String] = None
): Resource[IO, CollectorContainer] = {
val container = GenericContainer(
dockerImage = s"snowplow/scala-stream-collector-kinesis:${ProjectMetadata.dockerTag}",
dockerImage = BuildInfo.dockerAlias,
env = Map(
"AWS_ACCESS_KEY_ID" -> "whatever",
"AWS_SECRET_ACCESS_KEY" -> "whatever",
Expand All @@ -50,7 +46,8 @@ object Collector {
"REGION" -> Localstack.region,
"KINESIS_ENDPOINT" -> Localstack.privateEndpoint,
"MAX_BYTES" -> maxBytes.toString,
"JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink=warn"
"JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink=warn",
"HTTP4S_BACKEND" -> "BLAZE"
) ++ configParameters(additionalConfig),
exposedPorts = Seq(port),
fileSystemBind = Seq(
Expand All @@ -64,7 +61,7 @@ object Collector {
"--config",
"/snowplow/config/collector.hocon"
),
waitStrategy = Wait.forLogMessage(s".*REST interface bound to.*", 1)
waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1)
)
container.container.withNetwork(Localstack.network)

Expand Down
Loading

0 comments on commit 85723b9

Please sign in to comment.