Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 6, 2023
1 parent cfdaa58 commit c1474a9
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ target
src/main/resources/application.conf

.bsp

.localstack
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ lazy val nsqSettings =
lazy val nsq = project
.settings(nsqSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(core % "test->test;compile->compile;it->it")

lazy val nsqDistroless = project
.in(file("distroless/nsq"))
Expand Down
117 changes: 117 additions & 0 deletions nsq/src/it/nsq/Containers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.nsq

import org.testcontainers.containers.{BindMode, Network}
import org.testcontainers.containers.wait.strategy.Wait
import com.dimafeng.testcontainers.GenericContainer
import cats.effect.{IO, Resource}
import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer

object Containers {

val collectorPort = 8080
val projectId = "google-project-id"
val emulatorHost = "localhost"
val emulatorPort = 8085
lazy val emulatorHostPort = pubSubEmulator.getMappedPort(emulatorPort)
val topicGood = "good"
val topicBad = "bad"

private val network = Network.newNetwork()

private val pubSubEmulator = {
val container = GenericContainer(
dockerImage = "gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators",
waitStrategy = Wait.forLogMessage(".*Server started.*", 1),
exposedPorts = Seq(emulatorPort),
command = Seq(
"gcloud",
"beta",
"emulators",
"pubsub",
"start",
s"--project=$projectId",
s"--host-port=0.0.0.0:$emulatorPort"
)
)

container.underlyingUnsafeContainer.withNetwork(network)
container.underlyingUnsafeContainer.withNetworkAliases("pubsub-emulator")
container.container
}

def collector(
configPath: String,
testName: String,
topicGood: String,
topicBad: String,
createTopics: Boolean = true,
envs: Map[String, String] = Map.empty[String, String]
): Resource[IO, CollectorContainer] = {
val container = GenericContainer(
dockerImage = BuildInfo.dockerAlias,
env = Map(
"PUBSUB_EMULATOR_HOST" -> s"pubsub-emulator:$emulatorPort",
"PORT" -> collectorPort.toString,
"TOPIC_GOOD" -> topicGood,
"TOPIC_BAD" -> topicBad,
"GOOGLE_PROJECT_ID" -> projectId,
"MAX_BYTES" -> Integer.MAX_VALUE.toString,
"JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink=warn",
"HTTP4S_BACKEND" -> "BLAZE"
) ++ envs,
exposedPorts = Seq(collectorPort),
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
configPath,
"/snowplow/config/collector.hocon",
BindMode.READ_ONLY
)
),
command = Seq(
"--config",
"/snowplow/config/collector.hocon"
)
,waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1)
)
container.container.withNetwork(network)

val create =
if(createTopics)
PubSub.createTopicsAndSubscriptions(
projectId,
emulatorHost,
emulatorHostPort,
List(topicGood, topicBad)
)
else
IO.unit

Resource.make (
create *>
IO(startContainerWithLogs(container.container, testName))
.map(c => CollectorContainer(c, c.getHost, c.getMappedPort(collectorPort)))
)(
c => IO(c.container.stop())
)
}

def startEmulator(): Unit = pubSubEmulator.start()

def stopEmulator(): Unit = pubSubEmulator.stop()
}
180 changes: 180 additions & 0 deletions nsq/src/it/nsq/NsqCollectorSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.nsq

import scala.concurrent.duration._
import cats.effect.IO
import org.http4s.{Method, Request, Status, Uri}
import cats.effect.testing.specs2.CatsEffect
import org.specs2.mutable.Specification
import org.specs2.specification.BeforeAfterAll
import org.testcontainers.containers.GenericContainer
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http}

class NsqCollectorSpec extends Specification with CatsEffect with BeforeAfterAll {

override protected val Timeout = 5.minutes

def beforeAll: Unit = Containers.startEmulator()

def afterAll: Unit = Containers.stopEmulator()

val stopTimeout = 20.second

val maxBytes = 10000

"nsq-dev" should {
"spin up conatiners" in {

"this" must contain("this")
}
}

/*
"collector-pubsub" should {
"be able to parse the minimal config" in {
val testName = "minimal"
Containers.collector(
"examples/config.pubsub.minimal.hocon",
testName,
"good",
"bad"
).use { collector =>
IO(collector.container.getLogs() must contain("Service bound to address"))
}
}
"emit the correct number of collector payloads and bad rows" in {
val testName = "count"
val nbGood = 1000
val nbBad = 10
val topicGood = s"${testName}-raw"
val topicBad = s"${testName}-bad-1"
Containers.collector(
"pubsub/src/it/resources/collector.hocon",
testName,
topicGood,
topicBad,
envs = Map("MAX_BYTES" -> maxBytes.toString)
).use { collector =>
for {
_ <- log(testName, "Sending data")
_ <- EventGenerator.sendEvents(
collector.host,
collector.port,
nbGood,
nbBad,
maxBytes
)
_ <- log(testName, "Data sent. Waiting for collector to work")
_ <- IO.sleep(5.second)
_ <- log(testName, "Consuming collector's output")
collectorOutput <- PubSub.consume(
Containers.projectId,
Containers.emulatorHost,
Containers.emulatorHostPort,
topicGood,
topicBad
)
_ <- printBadRows(testName, collectorOutput.bad)
} yield {
collectorOutput.good.size should beEqualTo(nbGood)
collectorOutput.bad.size should beEqualTo(nbBad)
}
}
}
s"shutdown within $stopTimeout when it receives a SIGTERM" in {
val testName = "stop"
Containers.collector(
"pubsub/src/it/resources/collector.hocon",
testName,
s"${testName}-raw",
s"${testName}-bad-1"
).use { collector =>
val container = collector.container
for {
_ <- log(testName, "Sending signal")
_ <- IO(container.getDockerClient().killContainerCmd(container.getContainerId()).withSignal("TERM").exec())
_ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout)
} yield {
container.isRunning() must beFalse
container.getLogs() must contain("Closing NIO1 channel")
}
}
}
"start with /sink-health unhealthy and insert pending events when topics become available" in {
val testName = "sink-health"
val nbGood = 10
val nbBad = 10
val topicGood = s"${testName}-raw"
val topicBad = s"${testName}-bad-1"
Containers.collector(
"pubsub/src/it/resources/collector.hocon",
testName,
topicGood,
topicBad,
createTopics = false,
envs = Map("MAX_BYTES" -> maxBytes.toString)
).use { collector =>
val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}/sink-health")
val request = Request[IO](Method.GET, uri)
for {
_ <- log(testName, "Checking /sink-health before creating the topics")
statusBeforeCreate <- Http.status(request)
_ <- log(testName, "Sending events before creating the topics")
_ <- EventGenerator.sendEvents(
collector.host,
collector.port,
nbGood,
nbBad,
maxBytes
)
_ <- log(testName, "Creating topics")
_ <- PubSub.createTopicsAndSubscriptions(
Containers.projectId,
Containers.emulatorHost,
Containers.emulatorHostPort,
List(topicGood, topicBad)
)
_ <- IO.sleep(10.second)
_ <- log(testName, "Checking /sink-health after creating the topics")
statusAfterCreate <- Http.status(request)
collectorOutput <- PubSub.consume(
Containers.projectId,
Containers.emulatorHost,
Containers.emulatorHostPort,
topicGood,
topicBad
)
_ <- printBadRows(testName, collectorOutput.bad)
} yield {
statusBeforeCreate should beEqualTo(Status.ServiceUnavailable)
statusAfterCreate should beEqualTo(Status.Ok)
collectorOutput.good.size should beEqualTo(nbGood)
collectorOutput.bad.size should beEqualTo(nbBad)
}
}
}
}
*/
}

0 comments on commit c1474a9

Please sign in to comment.