diff --git a/build.sbt b/build.sbt index 25984eb2c..313dbdfef 100644 --- a/build.sbt +++ b/build.sbt @@ -42,8 +42,14 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.specs2, // Integration tests Dependencies.Libraries.testcontainersIt, + Dependencies.Libraries.http4sClient, Dependencies.Libraries.http4sClientIt, - Dependencies.Libraries.catsRetryIt + Dependencies.Libraries.catsRetry, + Dependencies.Libraries.catsRetryIt, + Dependencies.Libraries.doobie, + Dependencies.Libraries.snowflakeJdbc, + Dependencies.Libraries.http4sCirce, + Dependencies.Libraries.circeLiteral ) lazy val commonExclusions = Seq( diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/E2EScenarios.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/E2EScenarios.scala new file mode 100644 index 000000000..c5ee25d69 --- /dev/null +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/E2EScenarios.scala @@ -0,0 +1,89 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.e2e + +import cats.effect.{ContextShift, IO, Resource, Timer} +import com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage.StorageTarget +import doobie.implicits._ +import io.circe.Json +import io.circe.literal.JsonStringContext +import org.http4s._ +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.circe._ +import org.http4s.client.Client +import org.specs2.mutable.Specification +import retry.{RetryDetails, RetryPolicies, retryingOnFailures} + +import java.util.UUID +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt + +abstract class E2EScenarios extends Specification with StorageTarget { + skipAllIf(anyEnvironmentVariableMissing()) + + private lazy val collectorHostEnv = "TEST_COLLECTOR_HOST" + private lazy val timeout = 5.minutes + + implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val ioTimer: Timer[IO] = IO.timer(ExecutionContext.global) + + "Scenario 1" in { + val appId = s"e2e-test-${UUID.randomUUID()}" + val collectorPayload = + json""" + { + "schema": "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4", + "data": [ + { + "e": "pv", + "tv": "e2e-test", + "aid": $appId, + "p": "web" + } + ] + }""" + + val request = buildTP2CollectorRequest(collectorPayload) + val expectedDbCount = 1 + + val e2eScenario = for { + collectorResponse <- executeHttpRequest(request) + actualDbCount <- waitUntilAllDataReachDB(appId, expectedDbCount) + } yield { + collectorResponse.status.code must beEqualTo(200) + actualDbCount must beEqualTo(expectedDbCount) + } + + e2eScenario.unsafeRunSync() + } + + private def buildTP2CollectorRequest(payload: Json): Request[IO] = { + val uri = Uri.unsafeFromString(s"${System.getenv(collectorHostEnv)}/com.snowplowanalytics.snowplow/tp2") + Request[IO](Method.POST, uri).withEntity(payload) + } + + private def executeHttpRequest(request: Request[IO]): IO[Response[IO]] = + createClient.use(client => client.run(request).use(resp => IO.pure(resp))) + + private def waitUntilAllDataReachDB(appId: String, expectedDbCount: Int): IO[Long] = + retryingOnFailures[Long]( + policy = RetryPolicies.capDelay[IO](timeout, RetryPolicies.constantDelay[IO](10.seconds)), + wasSuccessful = actualDbCount => actualDbCount == expectedDbCount, + onFailure = (_, retryDetails) => IO.delay(println(renderRetryDetails(retryDetails))) + )(countDataInDB(appId)) + + private def countDataInDB(appId: String) = + countEventsWithAppIdQuery(appId).query[Long].unique.transact(transactor) + + private def createClient: Resource[IO, Client[IO]] = + BlazeClientBuilder[IO](ExecutionContext.global).resource + + private def anyEnvironmentVariableMissing(): Boolean = + (collectorHostEnv :: storageEnvironmentVariables).exists(varName => System.getenv(varName) == null) + + private def renderRetryDetails(retryDetails: RetryDetails): String = + retryDetails match { + case RetryDetails.GivingUp(totalRetries, totalDelay) => + s"Giving up, number of retries - $totalRetries, totalDelay - $totalDelay" + case RetryDetails.WillDelayAndRetry(_, retriesSoFar, cumulativeDelay) => + s"Retrying database query, retries so far - $retriesSoFar, cumulative delay - $cumulativeDelay" + } +} diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/SnowflakeE2E.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/SnowflakeE2E.scala new file mode 100644 index 000000000..e5ed09bee --- /dev/null +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/SnowflakeE2E.scala @@ -0,0 +1,14 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.e2e + +import com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage.SnowflakeSupport + +/** + * Following environment variables are required to run: + * - TEST_COLLECTOR_HOST + * - TEST_SNOWFLAKE_URL (format like: 'jdbc:snowflake://${accountName}.snowflakecomputing.com') + * - TEST_SNOWFLAKE_WAREHOUSE + * - TEST_SNOWFLAKE_DATABASE + * - TEST_SNOWFLAKE_USERNAME + * - TEST_SNOWFLAKE_PASSWORD + */ +class SnowflakeE2E extends E2EScenarios with SnowflakeSupport diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/storage/SnowflakeSupport.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/storage/SnowflakeSupport.scala new file mode 100644 index 000000000..96ff02c1b --- /dev/null +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/storage/SnowflakeSupport.scala @@ -0,0 +1,49 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage + +import cats.effect.{ContextShift, IO} +import SnowflakeSupport._ +import doobie.Transactor +import doobie.implicits._ +import doobie.util.fragment + +import java.util.Properties +import scala.concurrent.ExecutionContext + +trait SnowflakeSupport extends StorageTarget { + + override def transactor: Transactor[IO] = { + val props: Properties = new Properties() + props.put("warehouse", System.getenv(snowflakeWarehouseEnv)) + props.put("db", System.getenv(snowflakeDatabaseEnv)) + props.put("user", System.getenv(snowflakeUsernameEnv)) + props.put("password", System.getenv(snowflakePasswordEnv)) + props.put("timezone", "UTC") + + implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + Transactor.fromDriverManager[IO]( + driver = "net.snowflake.client.jdbc.SnowflakeDriver", + url = System.getenv(snowflakeUrlEnv), + props + ) + } + + override def countEventsWithAppIdQuery(appId: String): fragment.Fragment = + sql"""select count(*) from atomic.events where app_id = $appId""" + + override def storageEnvironmentVariables: List[String] = List( + snowflakeUrlEnv, + snowflakeWarehouseEnv, + snowflakeDatabaseEnv, + snowflakeUsernameEnv, + snowflakePasswordEnv + ) + +} + +object SnowflakeSupport { + val snowflakeUrlEnv = "TEST_SNOWFLAKE_URL" + val snowflakeWarehouseEnv = "TEST_SNOWFLAKE_WAREHOUSE" + val snowflakeDatabaseEnv = "TEST_SNOWFLAKE_DATABASE" + val snowflakeUsernameEnv = "TEST_SNOWFLAKE_USERNAME" + val snowflakePasswordEnv = "TEST_SNOWFLAKE_PASSWORD" +} diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/storage/StorageTarget.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/storage/StorageTarget.scala new file mode 100644 index 000000000..f5e9c1949 --- /dev/null +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/e2e/storage/StorageTarget.scala @@ -0,0 +1,15 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage + +import cats.effect.IO +import doobie.util.fragment +import doobie.util.transactor.Transactor + +trait StorageTarget { + + def transactor: Transactor[IO] + + def countEventsWithAppIdQuery(appId: String): fragment.Fragment + + def storageEnvironmentVariables: List[String] + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6cb214c79..6873856f4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -92,10 +92,16 @@ object Dependencies { val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest val specs2CEIt = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest val testcontainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry % Test val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s % Test val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4s % IntegrationTest val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % V.akka % Test + val doobie = "org.tpolecat" %% "doobie-core" % "0.13.4" % Test + val snowflakeJdbc = "net.snowflake" % "snowflake-jdbc" % "3.13.30" % Test + val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s % Test + val circeLiteral = "io.circe" %% "circe-literal" % "0.14.1" % Test } }