Skip to content

Commit

Permalink
Snowflake E2E test scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jul 21, 2023
1 parent d9ee385 commit ba51725
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 1 deletion.
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.specs2,
// Integration tests
Dependencies.Libraries.testcontainersIt,
Dependencies.Libraries.http4sClient,
Dependencies.Libraries.http4sClientIt,
Dependencies.Libraries.catsRetryIt
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.catsRetryIt,
Dependencies.Libraries.doobie,
Dependencies.Libraries.snowflakeJdbc,
Dependencies.Libraries.http4sCirce,
Dependencies.Libraries.circeLiteral
)

lazy val commonExclusions = Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.e2e

import cats.effect.{ContextShift, IO, Resource, Timer}
import com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage.StorageTarget
import doobie.implicits._
import io.circe.Json
import io.circe.literal.JsonStringContext
import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.circe._
import org.http4s.client.Client
import org.specs2.mutable.Specification
import retry.{RetryDetails, RetryPolicies, retryingOnFailures}

import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

abstract class E2EScenarios extends Specification with StorageTarget {
skipAllIf(anyEnvironmentVariableMissing())

private lazy val collectorHostEnv = "TEST_COLLECTOR_HOST"
private lazy val timeout = 5.minutes

implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val ioTimer: Timer[IO] = IO.timer(ExecutionContext.global)

"Scenario 1" in {
val appId = s"e2e-test-${UUID.randomUUID()}"
val collectorPayload =
json"""
{
"schema": "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4",
"data": [
{
"e": "pv",
"tv": "e2e-test",
"aid": $appId,
"p": "web"
}
]
}"""

val request = buildTP2CollectorRequest(collectorPayload)
val expectedDbCount = 1

val e2eScenario = for {
collectorResponse <- executeHttpRequest(request)
actualDbCount <- waitUntilAllDataReachDB(appId, expectedDbCount)
} yield {
collectorResponse.status.code must beEqualTo(200)
actualDbCount must beEqualTo(expectedDbCount)
}

e2eScenario.unsafeRunSync()
}

private def buildTP2CollectorRequest(payload: Json): Request[IO] = {
val uri = Uri.unsafeFromString(s"${System.getenv(collectorHostEnv)}/com.snowplowanalytics.snowplow/tp2")
Request[IO](Method.POST, uri).withEntity(payload)
}

private def executeHttpRequest(request: Request[IO]): IO[Response[IO]] =
createClient.use(client => client.run(request).use(resp => IO.pure(resp)))

private def waitUntilAllDataReachDB(appId: String, expectedDbCount: Int): IO[Long] =
retryingOnFailures[Long](
policy = RetryPolicies.capDelay[IO](timeout, RetryPolicies.constantDelay[IO](10.seconds)),
wasSuccessful = actualDbCount => actualDbCount == expectedDbCount,
onFailure = (_, retryDetails) => IO.delay(println(renderRetryDetails(retryDetails)))
)(countDataInDB(appId))

private def countDataInDB(appId: String) =
countEventsWithAppIdQuery(appId).query[Long].unique.transact(transactor)

private def createClient: Resource[IO, Client[IO]] =
BlazeClientBuilder[IO](ExecutionContext.global).resource

private def anyEnvironmentVariableMissing(): Boolean =
(collectorHostEnv :: storageEnvironmentVariables).exists(varName => System.getenv(varName) == null)

private def renderRetryDetails(retryDetails: RetryDetails): String =
retryDetails match {
case RetryDetails.GivingUp(totalRetries, totalDelay) =>
s"Giving up, number of retries - $totalRetries, totalDelay - $totalDelay"
case RetryDetails.WillDelayAndRetry(_, retriesSoFar, cumulativeDelay) =>
s"Retrying database query, retries so far - $retriesSoFar, cumulative delay - $cumulativeDelay"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.e2e

import com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage.SnowflakeSupport

/**
* Following environment variables are required to run:
* - TEST_COLLECTOR_HOST
* - TEST_SNOWFLAKE_URL (format like: 'jdbc:snowflake://${accountName}.snowflakecomputing.com')
* - TEST_SNOWFLAKE_WAREHOUSE
* - TEST_SNOWFLAKE_DATABASE
* - TEST_SNOWFLAKE_USERNAME
* - TEST_SNOWFLAKE_PASSWORD
*/
class SnowflakeE2E extends E2EScenarios with SnowflakeSupport
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage

import cats.effect.{ContextShift, IO}
import SnowflakeSupport._
import doobie.Transactor
import doobie.implicits._
import doobie.util.fragment

import java.util.Properties
import scala.concurrent.ExecutionContext

trait SnowflakeSupport extends StorageTarget {

override def transactor: Transactor[IO] = {
val props: Properties = new Properties()
props.put("warehouse", System.getenv(snowflakeWarehouseEnv))
props.put("db", System.getenv(snowflakeDatabaseEnv))
props.put("user", System.getenv(snowflakeUsernameEnv))
props.put("password", System.getenv(snowflakePasswordEnv))
props.put("timezone", "UTC")

implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
Transactor.fromDriverManager[IO](
driver = "net.snowflake.client.jdbc.SnowflakeDriver",
url = System.getenv(snowflakeUrlEnv),
props
)
}

override def countEventsWithAppIdQuery(appId: String): fragment.Fragment =
sql"""select count(*) from atomic.events where app_id = $appId"""

override def storageEnvironmentVariables: List[String] = List(
snowflakeUrlEnv,
snowflakeWarehouseEnv,
snowflakeDatabaseEnv,
snowflakeUsernameEnv,
snowflakePasswordEnv
)

}

object SnowflakeSupport {
val snowflakeUrlEnv = "TEST_SNOWFLAKE_URL"
val snowflakeWarehouseEnv = "TEST_SNOWFLAKE_WAREHOUSE"
val snowflakeDatabaseEnv = "TEST_SNOWFLAKE_DATABASE"
val snowflakeUsernameEnv = "TEST_SNOWFLAKE_USERNAME"
val snowflakePasswordEnv = "TEST_SNOWFLAKE_PASSWORD"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.e2e.storage

import cats.effect.IO
import doobie.util.fragment
import doobie.util.transactor.Transactor

trait StorageTarget {

def transactor: Transactor[IO]

def countEventsWithAppIdQuery(appId: String): fragment.Fragment

def storageEnvironmentVariables: List[String]

}
6 changes: 6 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,16 @@ object Dependencies {
val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest
val specs2CEIt = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest
val testcontainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry % Test
val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s % Test
val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4s % IntegrationTest
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test
val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test
val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % V.akka % Test
val doobie = "org.tpolecat" %% "doobie-core" % "0.13.4" % Test
val snowflakeJdbc = "net.snowflake" % "snowflake-jdbc" % "3.13.30" % Test
val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s % Test
val circeLiteral = "io.circe" %% "circe-literal" % "0.14.1" % Test
}
}

0 comments on commit ba51725

Please sign in to comment.