diff --git a/.gitignore b/.gitignore index ee01cd718..e58fc4d9b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ target src/main/resources/application.conf .bsp + +.localstack diff --git a/build.sbt b/build.sbt index 6165766b7..3523447fe 100644 --- a/build.sbt +++ b/build.sbt @@ -270,7 +270,7 @@ lazy val nsqSettings = lazy val nsq = project .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(core % "test->test;compile->compile;it->it") lazy val nsqDistroless = project .in(file("distroless/nsq")) diff --git a/nsq/src/it/nsq/Containers.scala b/nsq/src/it/nsq/Containers.scala new file mode 100644 index 000000000..fc55efd11 --- /dev/null +++ b/nsq/src/it/nsq/Containers.scala @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it.nsq + +import org.testcontainers.containers.{BindMode, Network} +import org.testcontainers.containers.wait.strategy.Wait +import com.dimafeng.testcontainers.GenericContainer +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer + +object Containers { + + val collectorPort = 8080 + val projectId = "google-project-id" + val emulatorHost = "localhost" + val emulatorPort = 8085 + lazy val emulatorHostPort = pubSubEmulator.getMappedPort(emulatorPort) + val topicGood = "good" + val topicBad = "bad" + + private val network = Network.newNetwork() + + private val pubSubEmulator = { + val container = GenericContainer( + dockerImage = "gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators", + waitStrategy = Wait.forLogMessage(".*Server started.*", 1), + exposedPorts = Seq(emulatorPort), + command = Seq( + "gcloud", + "beta", + "emulators", + "pubsub", + "start", + s"--project=$projectId", + s"--host-port=0.0.0.0:$emulatorPort" + ) + ) + + container.underlyingUnsafeContainer.withNetwork(network) + container.underlyingUnsafeContainer.withNetworkAliases("pubsub-emulator") + container.container + } + + def collector( + configPath: String, + testName: String, + topicGood: String, + topicBad: String, + createTopics: Boolean = true, + envs: Map[String, String] = Map.empty[String, String] + ): Resource[IO, CollectorContainer] = { + val container = GenericContainer( + dockerImage = BuildInfo.dockerAlias, + env = Map( + "PUBSUB_EMULATOR_HOST" -> s"pubsub-emulator:$emulatorPort", + "PORT" -> collectorPort.toString, + "TOPIC_GOOD" -> topicGood, + "TOPIC_BAD" -> topicBad, + "GOOGLE_PROJECT_ID" -> projectId, + "MAX_BYTES" -> Integer.MAX_VALUE.toString, + "JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink=warn", + "HTTP4S_BACKEND" -> "BLAZE" + ) ++ envs, + exposedPorts = Seq(collectorPort), + fileSystemBind = Seq( + GenericContainer.FileSystemBind( + configPath, + "/snowplow/config/collector.hocon", + BindMode.READ_ONLY + ) + ), + command = Seq( + "--config", + "/snowplow/config/collector.hocon" + ) + ,waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1) + ) + container.container.withNetwork(network) + + val create = + if(createTopics) + PubSub.createTopicsAndSubscriptions( + projectId, + emulatorHost, + emulatorHostPort, + List(topicGood, topicBad) + ) + else + IO.unit + + Resource.make ( + create *> + IO(startContainerWithLogs(container.container, testName)) + .map(c => CollectorContainer(c, c.getHost, c.getMappedPort(collectorPort))) + )( + c => IO(c.container.stop()) + ) + } + + def startEmulator(): Unit = pubSubEmulator.start() + + def stopEmulator(): Unit = pubSubEmulator.stop() +} diff --git a/nsq/src/it/nsq/NsqCollectorSpec.scala b/nsq/src/it/nsq/NsqCollectorSpec.scala new file mode 100644 index 000000000..d9c0d5959 --- /dev/null +++ b/nsq/src/it/nsq/NsqCollectorSpec.scala @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it.nsq + +import scala.concurrent.duration._ +import cats.effect.IO +import org.http4s.{Method, Request, Status, Uri} +import cats.effect.testing.specs2.CatsEffect +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterAll +import org.testcontainers.containers.GenericContainer +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} + +class NsqCollectorSpec extends Specification with CatsEffect with BeforeAfterAll { + + override protected val Timeout = 5.minutes + + def beforeAll: Unit = Containers.startEmulator() + + def afterAll: Unit = Containers.stopEmulator() + + val stopTimeout = 20.second + + val maxBytes = 10000 + + "nsq-dev" should { + "spin up conatiners" in { + + "this" must contain("this") + } + } + + /* + "collector-pubsub" should { + "be able to parse the minimal config" in { + val testName = "minimal" + + Containers.collector( + "examples/config.pubsub.minimal.hocon", + testName, + "good", + "bad" + ).use { collector => + IO(collector.container.getLogs() must contain("Service bound to address")) + } + } + + "emit the correct number of collector payloads and bad rows" in { + val testName = "count" + val nbGood = 1000 + val nbBad = 10 + val topicGood = s"${testName}-raw" + val topicBad = s"${testName}-bad-1" + + Containers.collector( + "pubsub/src/it/resources/collector.hocon", + testName, + topicGood, + topicBad, + envs = Map("MAX_BYTES" -> maxBytes.toString) + ).use { collector => + for { + _ <- log(testName, "Sending data") + _ <- EventGenerator.sendEvents( + collector.host, + collector.port, + nbGood, + nbBad, + maxBytes + ) + _ <- log(testName, "Data sent. Waiting for collector to work") + _ <- IO.sleep(5.second) + _ <- log(testName, "Consuming collector's output") + collectorOutput <- PubSub.consume( + Containers.projectId, + Containers.emulatorHost, + Containers.emulatorHostPort, + topicGood, + topicBad + ) + _ <- printBadRows(testName, collectorOutput.bad) + } yield { + collectorOutput.good.size should beEqualTo(nbGood) + collectorOutput.bad.size should beEqualTo(nbBad) + } + } + } + + s"shutdown within $stopTimeout when it receives a SIGTERM" in { + val testName = "stop" + + Containers.collector( + "pubsub/src/it/resources/collector.hocon", + testName, + s"${testName}-raw", + s"${testName}-bad-1" + ).use { collector => + val container = collector.container + for { + _ <- log(testName, "Sending signal") + _ <- IO(container.getDockerClient().killContainerCmd(container.getContainerId()).withSignal("TERM").exec()) + _ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout) + } yield { + container.isRunning() must beFalse + container.getLogs() must contain("Closing NIO1 channel") + } + } + } + + "start with /sink-health unhealthy and insert pending events when topics become available" in { + val testName = "sink-health" + val nbGood = 10 + val nbBad = 10 + val topicGood = s"${testName}-raw" + val topicBad = s"${testName}-bad-1" + + Containers.collector( + "pubsub/src/it/resources/collector.hocon", + testName, + topicGood, + topicBad, + createTopics = false, + envs = Map("MAX_BYTES" -> maxBytes.toString) + ).use { collector => + val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}/sink-health") + val request = Request[IO](Method.GET, uri) + + for { + _ <- log(testName, "Checking /sink-health before creating the topics") + statusBeforeCreate <- Http.status(request) + _ <- log(testName, "Sending events before creating the topics") + _ <- EventGenerator.sendEvents( + collector.host, + collector.port, + nbGood, + nbBad, + maxBytes + ) + _ <- log(testName, "Creating topics") + _ <- PubSub.createTopicsAndSubscriptions( + Containers.projectId, + Containers.emulatorHost, + Containers.emulatorHostPort, + List(topicGood, topicBad) + ) + _ <- IO.sleep(10.second) + _ <- log(testName, "Checking /sink-health after creating the topics") + statusAfterCreate <- Http.status(request) + collectorOutput <- PubSub.consume( + Containers.projectId, + Containers.emulatorHost, + Containers.emulatorHostPort, + topicGood, + topicBad + ) + _ <- printBadRows(testName, collectorOutput.bad) + } yield { + statusBeforeCreate should beEqualTo(Status.ServiceUnavailable) + statusAfterCreate should beEqualTo(Status.Ok) + collectorOutput.good.size should beEqualTo(nbGood) + collectorOutput.bad.size should beEqualTo(nbBad) + } + } + } + } + */ +}