diff --git a/build.sbt b/build.sbt index ec9d730b5..247b77e63 100644 --- a/build.sbt +++ b/build.sbt @@ -41,9 +41,9 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.akkaStreamTestkit, Dependencies.Libraries.specs2, // Integration tests - Dependencies.Libraries.testcontainersIt, - Dependencies.Libraries.http4sClientIt, - Dependencies.Libraries.catsRetryIt + Dependencies.Libraries.LegacyIT.testcontainers, + Dependencies.Libraries.LegacyIT.http4sClient, + Dependencies.Libraries.LegacyIT.catsRetry ) lazy val commonExclusions = Seq( @@ -142,9 +142,17 @@ lazy val http4s = project Dependencies.Libraries.decline, Dependencies.Libraries.circeGeneric, Dependencies.Libraries.circeConfig, - Dependencies.Libraries.specs2CE3 + Dependencies.Libraries.specs2, + + //Integration tests + Dependencies.Libraries.IT.testcontainers, + Dependencies.Libraries.IT.http4sClient, + Dependencies.Libraries.IT.catsRetry + ) ) + .settings(Defaults.itSettings) + .configs(IntegrationTest) lazy val kinesisSettings = allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( @@ -155,8 +163,8 @@ lazy val kinesisSettings = Dependencies.Libraries.sts, Dependencies.Libraries.sqs, // integration tests dependencies - Dependencies.Libraries.specs2It, - Dependencies.Libraries.specs2CEIt + Dependencies.Libraries.LegacyIT.specs2, + Dependencies.Libraries.LegacyIT.specs2CE ), IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value, IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated @@ -199,15 +207,16 @@ lazy val sqsDistroless = project .dependsOn(core % "test->test;compile->compile") lazy val pubsubSettings = - allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( moduleName := "snowplow-stream-collector-google-pubsub", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", Docker / packageName := "scala-stream-collector-pubsub", libraryDependencies ++= Seq( - Dependencies.Libraries.pubsub, - Dependencies.Libraries.protobuf, + Dependencies.Libraries.catsRetry, + Dependencies.Libraries.fs2PubSub, // integration tests dependencies - Dependencies.Libraries.specs2It, - Dependencies.Libraries.specs2CEIt, + Dependencies.Libraries.IT.specs2, + Dependencies.Libraries.IT.specs2CE, ), IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value, IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated @@ -216,7 +225,7 @@ lazy val pubsubSettings = lazy val pubsub = project .settings(pubsubSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile;it->it") + .dependsOn(http4s % "test->test;compile->compile;it->it") .configs(IntegrationTest) lazy val pubsubDistroless = project @@ -224,7 +233,7 @@ lazy val pubsubDistroless = project .settings(sourceDirectory := (pubsub / sourceDirectory).value) .settings(pubsubSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile;it->it") + .dependsOn(http4s % "test->test;compile->compile;it->it") .configs(IntegrationTest) lazy val kafkaSettings = @@ -272,12 +281,12 @@ lazy val nsqDistroless = project lazy val stdoutSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-stdout", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collector.stdout", Docker / packageName := "scala-stream-collector-stdout" ) lazy val stdout = project .settings(stdoutSettings) - .settings(buildInfoPackage := s"com.snowplowanalytics.snowplow.collector.stdout") .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) .dependsOn(http4s % "test->test;compile->compile") diff --git a/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorContainer.scala b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorContainer.scala new file mode 100644 index 000000000..0ec85bd9d --- /dev/null +++ b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorContainer.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it + +import org.testcontainers.containers.GenericContainer + +case class CollectorContainer( + container: GenericContainer[_], + host: String, + port: Int +) \ No newline at end of file diff --git a/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorOutput.scala b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorOutput.scala new file mode 100644 index 000000000..88f098bf4 --- /dev/null +++ b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorOutput.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload + +case class CollectorOutput( + good: List[CollectorPayload], + bad: List[BadRow] +) \ No newline at end of file diff --git a/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/EventGenerator.scala b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/EventGenerator.scala new file mode 100644 index 000000000..6f7cbdaed --- /dev/null +++ b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/EventGenerator.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it + +import cats.effect.IO + +import org.http4s.{Method, Request, Uri} + +object EventGenerator { + + def sendEvents( + collectorHost: String, + collectorPort: Int, + nbGood: Int, + nbBad: Int, + maxBytes: Int + ): IO[Unit] = { + val requests = generateEvents(collectorHost, collectorPort, nbGood, nbBad, maxBytes) + Http.statuses(requests) + .flatMap { responses => + responses.collect { case resp if resp.code != 200 => resp.reason } match { + case Nil => IO.unit + case errors => IO.raiseError(new RuntimeException(s"${errors.size} requests were not successful. Example error: ${errors.head}")) + } + } + } + + def generateEvents( + collectorHost: String, + collectorPort: Int, + nbGood: Int, + nbBad: Int, + maxBytes: Int + ): List[Request[IO]] = { + val good = List.fill(nbGood)(mkTp2Event(collectorHost, collectorPort, valid = true, maxBytes)) + val bad = List.fill(nbBad)(mkTp2Event(collectorHost, collectorPort, valid = false, maxBytes)) + good ++ bad + } + + def mkTp2Event( + collectorHost: String, + collectorPort: Int, + valid: Boolean = true, + maxBytes: Int = 100 + ): Request[IO] = { + val uri = Uri.unsafeFromString(s"http://$collectorHost:$collectorPort/com.snowplowanalytics.snowplow/tp2") + val body = if (valid) "foo" else "a" * (maxBytes + 1) + Request[IO](Method.POST, uri).withEntity(body) + } +} \ No newline at end of file diff --git a/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/Http.scala b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/Http.scala new file mode 100644 index 000000000..f0a453a50 --- /dev/null +++ b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/Http.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it + +import cats.effect.{IO, Resource} +import cats.implicits._ +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.client.Client +import org.http4s.{Request, Response, Status} + +object Http { + + def statuses(requests: List[Request[IO]]): IO[List[Status]] = + mkClient.use { client => requests.traverse(client.status) } + + def status(request: Request[IO]): IO[Status] = + mkClient.use { client => client.status(request) } + + def response(request: Request[IO]): IO[Response[IO]] = + mkClient.use(c => c.run(request).use(resp => IO.pure(resp))) + + def responses(requests: List[Request[IO]]): IO[List[Response[IO]]] = + mkClient.use(c => requests.traverse(r => c.run(r).use(resp => IO.pure(resp)))) + + def mkClient: Resource[IO, Client[IO]] = + BlazeClientBuilder.apply[IO].resource +} \ No newline at end of file diff --git a/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/utils.scala b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/utils.scala new file mode 100644 index 000000000..d6c32fa0b --- /dev/null +++ b/http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/utils.scala @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.it + +import scala.concurrent.duration._ + +import org.apache.thrift.TDeserializer + +import org.slf4j.LoggerFactory + +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.output.Slf4jLogConsumer + +import io.circe.{Json, parser} + +import cats.implicits._ + +import cats.effect.IO + +import retry.syntax.all._ +import retry.RetryPolicies + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload + +object utils { + + def parseCollectorPayload(bytes: Array[Byte]): CollectorPayload = { + val deserializer = new TDeserializer() + val target = new CollectorPayload() + deserializer.deserialize(target, bytes) + target + } + + def parseBadRow(bytes: Array[Byte]): BadRow = { + val str = new String(bytes) + val parsed = for { + json <- parser.parse(str).leftMap(_.message) + sdj <- SelfDescribingData.parse(json).leftMap(_.message("Can't decode JSON as SDJ")) + br <- sdj.data.as[BadRow].leftMap(_.getMessage()) + } yield br + parsed match { + case Right(br) => br + case Left(err) => throw new RuntimeException(s"Can't parse bad row. Error: $err") + } + } + + def printBadRows(testName: String, badRows: List[BadRow]): IO[Unit] = { + log(testName, "Bad rows:") *> + badRows.traverse_(br => log(testName, br.compact)) + } + + def log(testName: String, line: String): IO[Unit] = + IO(println(s"[$testName] $line")) + + def startContainerWithLogs( + container: GenericContainer[_], + loggerName: String + ): GenericContainer[_] = { + container.start() + val logger = LoggerFactory.getLogger(loggerName) + val logs = new Slf4jLogConsumer(logger) + container.followOutput(logs) + container + } + + def waitWhile[A]( + a: A, + condition: A => Boolean, + maxDelay: FiniteDuration + ): IO[Boolean] = { + val retryPolicy = RetryPolicies.limitRetriesByCumulativeDelay( + maxDelay, + RetryPolicies.capDelay[IO]( + 2.second, + RetryPolicies.fullJitter[IO](1.second) + ) + ) + + IO(condition(a)).retryingOnFailures( + result => IO(!result), + retryPolicy, + (_, _) => IO.unit + ) + } + + /** Return a list of config parameters from a raw JSON string. */ + def getConfigParameters(config: String): List[String] = { + val parsed: Json = parser.parse(config).valueOr { case failure => + throw new IllegalArgumentException("Can't parse JSON", failure.underlying) + } + + def flatten(value: Json): Option[List[(String, Json)]] = + value.asObject.map( + _.toList.flatMap { + case (k, v) => flatten(v) match { + case None => List(k -> v) + case Some(fields) => fields.map { + case (innerK, innerV) => s"$k.$innerK" -> innerV + } + } + } + ) + + def withSpaces(s: String): String = if(s.contains(" ")) s""""$s"""" else s + + val fields = flatten(parsed).getOrElse(throw new IllegalArgumentException("Couldn't flatten fields")) + + fields.flatMap { + case (k, v) if v.isString => + List(s"-D$k=${withSpaces(v.asString.get)}") + case (k, v) if v.isArray => + v.asArray.get.toList.zipWithIndex.map { + case (s, i) if s.isString => + s"-D$k.$i=${withSpaces(s.asString.get)}" + case (other, i) => + s"-D$k.$i=${withSpaces(other.toString)}" + } + case (k, v) => + List(s"-D$k=${withSpaces(v.toString)}") + } + } +} diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf index b4c83f3f9..f7cc3def0 100644 --- a/http4s/src/main/resources/reference.conf +++ b/http4s/src/main/resources/reference.conf @@ -46,7 +46,7 @@ collector { } cors { - accessControlMaxAge = 60 seconds + accessControlMaxAge = 60 minutes } streams { @@ -78,6 +78,7 @@ collector { } enableDefaultRedirect = false + preTerminationPeriod = 10 seconds redirectDomains = [] diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index df25ac885..cb69be2f9 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -1,6 +1,6 @@ package com.snowplowanalytics.snowplow.collector.core -import cats.effect.{ExitCode, IO, Sync} +import cats.effect.{ExitCode, IO} import cats.effect.kernel.Resource import com.monovore.decline.effect.CommandIOApp @@ -17,7 +17,7 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) version = appInfo.version ) { - def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]] + def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]] final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks) } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala index c2960ba8d..2c12b7b09 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala @@ -67,7 +67,7 @@ object ConfigParser { } private def loadAll(config: TypesafeConfig): TypesafeConfig = - namespaced(ConfigFactory.load(namespaced(config.withFallback(namespaced(ConfigFactory.load()))))) + namespaced(config.withFallback(namespaced(ConfigFactory.load()))) private def namespaced(config: TypesafeConfig): TypesafeConfig = { val namespace = "collector" diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index f7adec6cf..3f9cad421 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -14,6 +14,14 @@ class Routes[F[_]: Sync](enableDefaultRedirect: Boolean, service: IService[F]) e private val healthRoutes = HttpRoutes.of[F] { case GET -> Root / "health" => Ok("OK") + case GET -> Root / "sink-health" => + service + .sinksHealthy + .ifM( + ifTrue = Ok("OK"), + ifFalse = ServiceUnavailable("Service Unavailable") + ) + } private val corsRoute = HttpRoutes.of[F] { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index 74ac6f6ee..ad3373cea 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -34,6 +34,7 @@ trait IService[F[_]] { contentType: Option[String] = None ): F[Response[F]] def determinePath(vendor: String, version: String): String + def sinksHealthy: F[Boolean] } object Service { @@ -113,6 +114,8 @@ class Service[F[_]: Sync]( ) } yield resp + override def sinksHealthy: F[Boolean] = (sinks.good.isHealthy, sinks.bad.isHealthy).mapN(_ && _) + override def determinePath(vendor: String, version: String): String = { val original = s"/$vendor/$version" config.paths.getOrElse(original, original) @@ -392,4 +395,5 @@ class Service[F[_]: Sync]( case Some(_) => Some(Service.spAnonymousNuid) case None => request.uri.query.params.get("nuid").orElse(requestCookie.map(_.content)) } + } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala index 97c9dd847..8485aa83d 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala @@ -55,6 +55,8 @@ class RoutesSpec extends Specification { } override def determinePath(vendor: String, version: String): String = "/p1/p2" + + override def sinksHealthy: IO[Boolean] = IO.pure(true) } def createTestServices(enabledDefaultRedirect: Boolean = true) = { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c8585670c..2e40a84b1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,13 +54,19 @@ object Dependencies { val decline = "2.4.1" val circe = "0.14.1" val circeConfig = "0.10.0" + val fs2PubSub = "0.22.0" + val catsRetry = "3.1.0" + // Scala (test only) val specs2 = "4.11.0" - val specs2CE = "0.4.1" - val specs2CE3 = "1.5.0" + val specs2CE = "1.5.0" val testcontainers = "0.40.10" - val catsRetry = "2.1.0" - val http4sIT = "0.21.33" + + object LegacyIT { + val specs2CE = "0.4.1" + val catsRetry = "2.1.0" + val http4s = "0.21.33" + } } object Libraries { @@ -97,24 +103,42 @@ object Dependencies { val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats // http4s - val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s - val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s - val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze - val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty - val decline = "com.monovore" %% "decline-effect" % V.decline - val circeGeneric = "io.circe" %% "circe-generic" % V.circe - val circeConfig = "io.circe" %% "circe-config" % V.circeConfig + val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s + val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s + val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze + val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val decline = "com.monovore" %% "decline-effect" % V.decline + val circeGeneric = "io.circe" %% "circe-generic" % V.circe + val circeConfig = "io.circe" %% "circe-config" % V.circeConfig + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry + val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub // Scala (test only) - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test - val specs2CE3 = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE3 % Test - val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest - val specs2CEIt = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest - val testcontainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest - val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest - val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4sIT % IntegrationTest - val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test - val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test - val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % V.akka % Test + + // Test common + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + + // Test Akka + val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test + val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test + val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % V.akka % Test + + // Integration tests + object IT { + val testcontainers = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest + val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze % IntegrationTest + } + + // Integration test legacy + object LegacyIT { + val testcontainers = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest + val specs2CE = "com.codecommit" %% "cats-effect-testing-specs2" % V.LegacyIT.specs2CE % IntegrationTest + val catsRetry = "com.github.cb372" %% "cats-retry" % V.LegacyIT.catsRetry % IntegrationTest + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.LegacyIT.http4s % IntegrationTest + } } } diff --git a/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/Containers.scala b/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/Containers.scala index 6f91b9297..c8b472508 100644 --- a/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/Containers.scala +++ b/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/Containers.scala @@ -14,25 +14,20 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream.it.pubsub -import scala.concurrent.ExecutionContext - import org.testcontainers.containers.{BindMode, Network} import org.testcontainers.containers.wait.strategy.Wait import com.dimafeng.testcontainers.GenericContainer -import cats.effect.{IO, Resource, Timer} +import cats.effect.{IO, Resource} -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.ProjectMetadata +import com.snowplowanalytics.snowplow.collectors.scalastream.ProjectMetadata import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer object Containers { - private val executionContext: ExecutionContext = ExecutionContext.global - implicit val ioTimer: Timer[IO] = IO.timer(executionContext) - val collectorPort = 8080 val projectId = "google-project-id" val emulatorHost = "localhost" @@ -81,7 +76,8 @@ object Containers { "TOPIC_BAD" -> topicBad, "GOOGLE_PROJECT_ID" -> projectId, "MAX_BYTES" -> Integer.MAX_VALUE.toString, - "JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink=warn" + "JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink=warn", + "HTTP4S_BACKEND" -> "BLAZE" ) ++ envs, exposedPorts = Seq(collectorPort), fileSystemBind = Seq( @@ -95,7 +91,7 @@ object Containers { "--config", "/snowplow/config/collector.hocon" ) - ,waitStrategy = Wait.forLogMessage(s".*REST interface bound to.*", 1) + ,waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1) ) container.container.withNetwork(network) diff --git a/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/GooglePubSubCollectorSpec.scala b/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/GooglePubSubCollectorSpec.scala index d1943eeb3..5423824c6 100644 --- a/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/GooglePubSubCollectorSpec.scala +++ b/pubsub/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/pubsub/GooglePubSubCollectorSpec.scala @@ -15,22 +15,16 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.it.pubsub import scala.concurrent.duration._ - import cats.effect.IO - -import org.http4s.{Request, Method, Uri, Status} - -import cats.effect.testing.specs2.CatsIO - +import org.http4s.{Method, Request, Status, Uri} +import cats.effect.testing.specs2.CatsEffect import org.specs2.mutable.Specification import org.specs2.specification.BeforeAfterAll - import org.testcontainers.containers.GenericContainer - import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} -class GooglePubSubCollectorSpec extends Specification with CatsIO with BeforeAfterAll { +class GooglePubSubCollectorSpec extends Specification with CatsEffect with BeforeAfterAll { override protected val Timeout = 5.minutes @@ -52,7 +46,7 @@ class GooglePubSubCollectorSpec extends Specification with CatsIO with BeforeAft "good", "bad" ).use { collector => - IO(collector.container.getLogs() must contain(("REST interface bound to"))) + IO(collector.container.getLogs() must contain("Service bound to address")) } } @@ -113,7 +107,7 @@ class GooglePubSubCollectorSpec extends Specification with CatsIO with BeforeAft _ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout) } yield { container.isRunning() must beFalse - container.getLogs() must contain("Server terminated") + container.getLogs() must contain("Closing NIO1 channel") } } } diff --git a/pubsub/src/main/resources/application.conf b/pubsub/src/main/resources/application.conf index b96335869..12817af73 100644 --- a/pubsub/src/main/resources/application.conf +++ b/pubsub/src/main/resources/application.conf @@ -26,26 +26,4 @@ collector { timeLimit = 1000 } } -} - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} +} \ No newline at end of file diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala deleted file mode 100644 index 0a79ee614..000000000 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream - -import cats.syntax.either._ -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService - -object GooglePubSubCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion - - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks: Either[Throwable, CollectorSinks] = for { - pc <- collectorConf.streams.sink match { - case pc: GooglePubSub => pc.asRight - case _ => new IllegalArgumentException("Configured sink is not PubSub").asLeft - } - goodStream = collectorConf.streams.good - badStream = collectorConf.streams.bad - bufferConf = collectorConf.streams.buffer - good <- GooglePubSubSink.createAndInitialize( - pc.maxBytes, - pc, - bufferConf, - goodStream - ) - bad <- GooglePubSubSink.createAndInitialize( - pc.maxBytes, - pc, - bufferConf, - badStream - ) - } yield CollectorSinks(good, bad) - - sinks match { - case Right(s) => run(collectorConf, akkaConf, s, telemetry) - case Left(e) => throw e - } - } -} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala new file mode 100644 index 000000000..6a1648ca6 --- /dev/null +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala @@ -0,0 +1,16 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect._ +import cats.effect.kernel.Resource +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{PubSubSink, PubSubSinkConfig} + +object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { + + override def mkSinks(config: Config.Streams[PubSubSinkConfig]): Resource[IO, Sinks[IO]] = + for { + good <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.good) + bad <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.bad) + } yield Sinks(good, bad) +} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/BuilderOps.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/BuilderOps.scala new file mode 100644 index 000000000..0290bef24 --- /dev/null +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/BuilderOps.scala @@ -0,0 +1,36 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import com.google.api.gax.core.NoCredentialsProvider +import com.google.api.gax.grpc.GrpcTransportChannel +import com.google.api.gax.rpc.FixedTransportChannelProvider +import com.google.cloud.pubsub.v1.{Publisher, TopicAdminSettings} +import io.grpc.ManagedChannelBuilder + +object BuilderOps { + + implicit class PublisherBuilderOps(val builder: Publisher.Builder) extends AnyVal { + def setProvidersForEmulator(): Publisher.Builder = + customEmulatorHost().fold(builder) { emulatorHost => + builder + .setChannelProvider(createCustomChannelProvider(emulatorHost)) + .setCredentialsProvider(NoCredentialsProvider.create()) + } + } + + implicit class TopicAdminBuilderOps(val builder: TopicAdminSettings.Builder) extends AnyVal { + def setProvidersForEmulator(): TopicAdminSettings.Builder = + customEmulatorHost().fold(builder) { emulatorHost => + builder + .setTransportChannelProvider(createCustomChannelProvider(emulatorHost)) + .setCredentialsProvider(NoCredentialsProvider.create()) + } + } + + private def customEmulatorHost(): Option[String] = + sys.env.get("PUBSUB_EMULATOR_HOST") + + private def createCustomChannelProvider(emulatorHost: String): FixedTransportChannelProvider = { + val channel = ManagedChannelBuilder.forTarget(emulatorHost).usePlaintext().build() + FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)) + } +} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala deleted file mode 100644 index eb12a78c2..000000000 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream -package sinks - -import java.util.concurrent.Executors - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer -import scala.util._ -import scala.concurrent.duration.{FiniteDuration, MILLISECONDS} -import scala.concurrent.duration._ - -import org.threeten.bp.Duration - -import com.google.api.core.{ApiFutureCallback, ApiFutures} -import com.google.api.gax.batching.BatchingSettings -import com.google.api.gax.retrying.RetrySettings -import com.google.api.gax.core.{CredentialsProvider, NoCredentialsProvider} -import com.google.api.gax.grpc.GrpcTransportChannel -import com.google.api.gax.rpc.{ - ApiException, - FixedHeaderProvider, - FixedTransportChannelProvider, - TransportChannelProvider -} -import com.google.cloud.pubsub.v1.{Publisher, TopicAdminClient, TopicAdminSettings} -import com.google.pubsub.v1.{ProjectName, ProjectTopicName, PubsubMessage, TopicName} -import com.google.protobuf.ByteString - -import io.grpc.ManagedChannelBuilder - -import cats.syntax.either._ - -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ - -class GooglePubSubSink private ( - val maxBytes: Int, - publisher: Publisher, - projectId: String, - topicName: String, - retryInterval: FiniteDuration -) extends Sink { - private val logExecutor = Executors.newSingleThreadExecutor() - // 2 = 1 for health check + 1 for retrying failed inserts - private val scheduledExecutor = Executors.newScheduledThreadPool(2) - - private val failedInsertsBuffer = ListBuffer.empty[Array[Byte]] - - @volatile private var pubsubHealthy: Boolean = false - override def isHealthy: Boolean = pubsubHealthy - - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - if (events.nonEmpty) { - log.info(s"Writing ${events.size} records to PubSub topic $topicName") - - events.foreach { event => - publisher.asRight.map { p => - val future = p.publish(eventToPubsubMessage(event)) - ApiFutures.addCallback( - future, - new ApiFutureCallback[String]() { - override def onSuccess(messageId: String): Unit = { - pubsubHealthy = true - log.debug(s"Successfully published event with id $messageId to $topicName") - } - - override def onFailure(throwable: Throwable): Unit = { - pubsubHealthy = false - throwable match { - case apiEx: ApiException => - val retryable = if (apiEx.isRetryable()) "retryable" else "non-retryable" - log.error( - s"Publishing message to $topicName failed with code ${apiEx.getStatusCode} and $retryable error: ${apiEx.getMessage}" - ) - case t => log.error(s"Publishing message to $topicName failed with error: ${t.getMessage}") - } - failedInsertsBuffer.synchronized { - failedInsertsBuffer.prepend(event) - } - } - }, - logExecutor - ) - } - } - } - - override def shutdown(): Unit = { - publisher.shutdown() - scheduledExecutor.shutdown() - scheduledExecutor.awaitTermination(10000, MILLISECONDS) - () - } - - /** - * Convert event bytes to a PubsubMessage to be published - * @param event Event to be converted - * @return a PubsubMessage - */ - private def eventToPubsubMessage(event: Array[Byte]): PubsubMessage = - PubsubMessage.newBuilder.setData(ByteString.copyFrom(event)).build() - - private def retryRunnable: Runnable = new Runnable { - override def run() { - val failedInserts = failedInsertsBuffer.synchronized { - val records = failedInsertsBuffer.toList - failedInsertsBuffer.clear() - records - } - if (failedInserts.nonEmpty) { - log.info(s"Retrying to insert ${failedInserts.size} records into $topicName") - storeRawEvents(failedInserts, "NOT USED") - } - } - } - scheduledExecutor.scheduleWithFixedDelay(retryRunnable, retryInterval.toMillis, retryInterval.toMillis, MILLISECONDS) - - private def checkPubsubHealth( - customProviders: Option[(TransportChannelProvider, CredentialsProvider)], - startupCheckInterval: FiniteDuration - ): Unit = { - val healthRunnable = new Runnable { - override def run() { - val topicAdmin = GooglePubSubSink.createTopicAdmin(customProviders) - - while (!pubsubHealthy) { - GooglePubSubSink.topicExists(topicAdmin, projectId, topicName) match { - case Right(true) => - log.info(s"Topic $topicName exists") - pubsubHealthy = true - case Right(false) => - log.error(s"Topic $topicName doesn't exist") - Thread.sleep(startupCheckInterval.toMillis) - case Left(err) => - log.error(s"Error while checking if topic $topicName exists: ${err.getCause()}") - Thread.sleep(startupCheckInterval.toMillis) - } - } - - Either.catchNonFatal(topicAdmin.close()) match { - case Right(_) => - case Left(err) => - log.error(s"Error when closing topicAdmin: ${err.getMessage()}") - } - } - } - scheduledExecutor.execute(healthRunnable) - } -} - -/** GooglePubSubSink companion object with factory method */ -object GooglePubSubSink { - def createAndInitialize( - maxBytes: Int, - googlePubSubConfig: GooglePubSub, - bufferConfig: BufferConfig, - topicName: String - ): Either[Throwable, GooglePubSubSink] = - for { - batching <- batchingSettings(bufferConfig).asRight - retry = retrySettings(googlePubSubConfig.backoffPolicy) - customProviders = sys.env.get("PUBSUB_EMULATOR_HOST").map { hostPort => - val channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build() - val channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)) - val credentialsProvider = NoCredentialsProvider.create() - (channelProvider, credentialsProvider) - } - publisher <- createPublisher(googlePubSubConfig.googleProjectId, topicName, batching, retry, customProviders) - sink = new GooglePubSubSink( - maxBytes, - publisher, - googlePubSubConfig.googleProjectId, - topicName, - googlePubSubConfig.retryInterval - ) - _ = sink.checkPubsubHealth(customProviders, googlePubSubConfig.startupCheckInterval) - } yield sink - - private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}" - - /** - * Instantiates a Publisher on a topic with the given configuration options. - * This can fail if the publisher can't be created. - * @return a PubSub publisher or an error - */ - private def createPublisher( - projectId: String, - topicName: String, - batchingSettings: BatchingSettings, - retrySettings: RetrySettings, - customProviders: Option[(TransportChannelProvider, CredentialsProvider)] - ): Either[Throwable, Publisher] = { - val builder = Publisher - .newBuilder(ProjectTopicName.of(projectId, topicName)) - .setBatchingSettings(batchingSettings) - .setRetrySettings(retrySettings) - .setHeaderProvider(FixedHeaderProvider.create("User-Agent", UserAgent)) - customProviders.foreach { - case (channelProvider, credentialsProvider) => - builder.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider) - } - Either.catchNonFatal(builder.build()).leftMap(e => new RuntimeException("Couldn't build PubSub publisher", e)) - } - - private def batchingSettings(bufferConfig: BufferConfig): BatchingSettings = - BatchingSettings - .newBuilder() - .setElementCountThreshold(bufferConfig.recordLimit) - .setRequestByteThreshold(bufferConfig.byteLimit) - .setDelayThreshold(Duration.ofMillis(bufferConfig.timeLimit)) - .build() - - /** Defaults are used for the rpc configuration, see Publisher.java */ - private def retrySettings(backoffPolicy: GooglePubSubBackoffPolicyConfig): RetrySettings = - RetrySettings - .newBuilder() - .setInitialRetryDelay(Duration.ofMillis(backoffPolicy.minBackoff)) - .setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff)) - .setRetryDelayMultiplier(backoffPolicy.multiplier) - .setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff)) - .setInitialRpcTimeout(Duration.ofMillis(backoffPolicy.initialRpcTimeout)) - .setRpcTimeoutMultiplier(backoffPolicy.rpcTimeoutMultiplier) - .setMaxRpcTimeout(Duration.ofMillis(backoffPolicy.maxRpcTimeout)) - .build() - - private def createTopicAdmin( - customProviders: Option[(TransportChannelProvider, CredentialsProvider)] - ): TopicAdminClient = - customProviders match { - case Some((channelProvider, credentialsProvider)) => - TopicAdminClient.create( - TopicAdminSettings - .newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .build() - ) - case None => - TopicAdminClient.create() - } - - private def topicExists( - topicAdmin: TopicAdminClient, - projectId: String, - topicName: String - ): Either[Throwable, Boolean] = - Either - .catchNonFatal(topicAdmin.listTopics(ProjectName.of(projectId))) - .leftMap(new RuntimeException(s"Can't list topics", _)) - .map(_.iterateAll.asScala.toList.map(_.getName())) - .flatMap { topics => - topics.contains(TopicName.of(projectId, topicName).toString()).asRight - } -} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala new file mode 100644 index 000000000..07940c3c0 --- /dev/null +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubHealthCheck.scala @@ -0,0 +1,74 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.effect.implicits.genSpawnOps +import cats.effect.{Async, Ref, Resource, Sync} +import cats.implicits._ +import com.google.cloud.pubsub.v1.{TopicAdminClient, TopicAdminSettings} +import com.google.pubsub.v1.{ProjectName, TopicName} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.BuilderOps._ +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.collection.JavaConverters._ +import scala.util._ + +object PubSubHealthCheck { + + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + def run[F[_]: Async]( + isHealthyState: Ref[F, Boolean], + sinkConfig: PubSubSinkConfig, + topicName: String + ): Resource[F, Unit] = + for { + topicAdminClient <- createTopicAdminClient[F]() + healthCheckTask = createHealthCheckTask[F](topicAdminClient, isHealthyState, sinkConfig, topicName) + _ <- repeatInBackgroundUntilHealthy(isHealthyState, sinkConfig, healthCheckTask) + } yield () + + private def repeatInBackgroundUntilHealthy[F[_]: Async]( + isHealthyState: Ref[F, Boolean], + sinkConfig: PubSubSinkConfig, + healthCheckTask: F[Unit] + ): Resource[F, Unit] = { + val checkThenSleep = healthCheckTask *> Async[F].sleep(sinkConfig.startupCheckInterval) + checkThenSleep.untilM_(isHealthyState.get).background.void + } + + private def createHealthCheckTask[F[_]: Async]( + topicAdminClient: TopicAdminClient, + isHealthyState: Ref[F, Boolean], + sinkConfig: PubSubSinkConfig, + topicName: String + ): F[Unit] = + topicExists(topicAdminClient, sinkConfig.googleProjectId, topicName).flatMap { + case Right(true) => + Logger[F].info(s"Topic $topicName exists") *> isHealthyState.set(true) + case Right(false) => + Logger[F].error(s"Topic $topicName doesn't exist") + case Left(err) => + Logger[F].error(s"Error while checking if topic $topicName exists: ${err.getCause}") + } + + private def createTopicAdminClient[F[_]: Sync](): Resource[F, TopicAdminClient] = { + val builder = TopicAdminSettings.newBuilder().setProvidersForEmulator().build() + Resource.make(Sync[F].delay(TopicAdminClient.create(builder)))(client => Sync[F].delay(client.close())) + } + + private def topicExists[F[_]: Sync]( + topicAdmin: TopicAdminClient, + projectId: String, + topicName: String + ): F[Either[Throwable, Boolean]] = Sync[F].delay { + Either + .catchNonFatal(topicAdmin.listTopics(ProjectName.of(projectId))) + .leftMap(new RuntimeException(s"Can't list topics", _)) + .map(_.iterateAll.asScala.toList.map(_.getName())) + .flatMap { topics => + topics.contains(TopicName.of(projectId, topicName).toString).asRight + } + } + +} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala new file mode 100644 index 000000000..9f0263724 --- /dev/null +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.Parallel +import cats.effect.implicits.genSpawnOps +import cats.effect.{Async, Ref, Resource, Sync} +import cats.implicits._ +import com.google.api.gax.retrying.RetrySettings +import com.google.api.gax.rpc.{ApiException, FixedHeaderProvider} +import com.permutive.pubsub.producer.Model.{ProjectId, Topic} +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} +import com.permutive.pubsub.producer.{Model, PubsubProducer} +import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} +import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.BuilderOps._ +import org.threeten.bp.Duration +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry.RetryPolicies +import retry.syntax.all._ + +import scala.concurrent.duration.{DurationLong, FiniteDuration} +import scala.util._ + +class PubSubSink[F[_]: Async: Parallel: Logger] private ( + override val maxBytes: Int, + isHealthyState: Ref[F, Boolean], + producer: PubsubProducer[F, Array[Byte]], + retryInterval: FiniteDuration, + topicName: String +) extends Sink[F] { + + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + produceBatch(events).start.void + + override def isHealthy: F[Boolean] = isHealthyState.get + + private def produceBatch(events: List[Array[Byte]]): F[Unit] = + events.parTraverse_ { event => + produceSingleEvent(event) *> isHealthyState.set(true) + } + + private def produceSingleEvent(event: Array[Byte]): F[Model.MessageId] = + producer + .produce(event) + .retryingOnAllErrors( + policy = RetryPolicies.constantDelay(retryInterval), + onError = (error, _) => handlePublishError(error) + ) + + private def handlePublishError(error: Throwable): F[Unit] = + isHealthyState.set(false) *> Logger[F].error(createErrorMessage(error)) + + private def createErrorMessage(error: Throwable): String = + error match { + case apiEx: ApiException => + val retryable = if (apiEx.isRetryable) "retryable" else "non-retryable" + s"Publishing message to $topicName failed with code ${apiEx.getStatusCode} and $retryable error: ${apiEx.getMessage}" + case throwable => s"Publishing message to $topicName failed with error: ${throwable.getMessage}" + } +} + +object PubSubSink { + private val UserAgent = s"snowplow/stream-collector-${BuildInfo.version}" + + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + implicit val byteArrayEncoder: MessageEncoder[Array[Byte]] = + new MessageEncoder[Array[Byte]] { + def encode(a: Array[Byte]): Either[Throwable, Array[Byte]] = + a.asRight + } + + def create[F[_]: Async: Parallel]( + maxBytes: Int, + sinkConfig: PubSubSinkConfig, + bufferConfig: Config.Buffer, + topicName: String + ): Resource[F, Sink[F]] = + for { + isHealthyState <- Resource.eval(Ref.of[F, Boolean](false)) + producer <- createProducer[F](sinkConfig, topicName, bufferConfig) + _ <- PubSubHealthCheck.run(isHealthyState, sinkConfig, topicName) + } yield new PubSubSink( + maxBytes, + isHealthyState, + producer, + sinkConfig.retryInterval, + topicName + ) + + private def createProducer[F[_]: Async: Parallel]( + sinkConfig: PubSubSinkConfig, + topicName: String, + bufferConfig: Config.Buffer + ): Resource[F, PubsubProducer[F, Array[Byte]]] = { + val config = PubsubProducerConfig[F]( + batchSize = bufferConfig.recordLimit, + requestByteThreshold = Some(bufferConfig.byteLimit), + delayThreshold = bufferConfig.timeLimit.millis, + onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"), + customizePublisher = Some { + _.setRetrySettings(retrySettings(sinkConfig.backoffPolicy)) + .setHeaderProvider(FixedHeaderProvider.create("User-Agent", UserAgent)) + .setProvidersForEmulator() + } + ) + + GooglePubsubProducer.of[F, Array[Byte]](ProjectId(sinkConfig.googleProjectId), Topic(topicName), config) + } + + private def retrySettings(backoffPolicy: PubSubSinkConfig.BackoffPolicy): RetrySettings = + RetrySettings + .newBuilder() + .setInitialRetryDelay(Duration.ofMillis(backoffPolicy.minBackoff)) + .setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff)) + .setRetryDelayMultiplier(backoffPolicy.multiplier) + .setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff)) + .setInitialRpcTimeout(Duration.ofMillis(backoffPolicy.initialRpcTimeout)) + .setRpcTimeoutMultiplier(backoffPolicy.rpcTimeoutMultiplier) + .setMaxRpcTimeout(Duration.ofMillis(backoffPolicy.maxRpcTimeout)) + .build() +} diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala new file mode 100644 index 000000000..d8c92955b --- /dev/null +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala @@ -0,0 +1,33 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import com.snowplowanalytics.snowplow.collector.core.Config +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig.BackoffPolicy +import io.circe.Decoder +import io.circe.config.syntax.durationDecoder +import io.circe.generic.semiauto._ + +import scala.concurrent.duration.FiniteDuration + +final case class PubSubSinkConfig( + maxBytes: Int, + googleProjectId: String, + backoffPolicy: BackoffPolicy, + startupCheckInterval: FiniteDuration, + retryInterval: FiniteDuration +) extends Config.Sink + +object PubSubSinkConfig { + + final case class BackoffPolicy( + minBackoff: Long, + maxBackoff: Long, + totalBackoff: Long, + multiplier: Double, + initialRpcTimeout: Long, + maxRpcTimeout: Long, + rpcTimeoutMultiplier: Double + ) + implicit val configDecoder: Decoder[PubSubSinkConfig] = deriveDecoder[PubSubSinkConfig] + implicit val backoffPolicyConfigDecoder: Decoder[BackoffPolicy] = + deriveDecoder[BackoffPolicy] +} diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala new file mode 100644 index 000000000..f0fde5fba --- /dev/null +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2012-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.unsafe.implicits.global +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification + +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class ConfigSpec extends Specification { + + "Config parser" should { + "be able to parse extended pubsub config" in { + parse("/config.pubsub.extended.hocon") must beRight(ConfigSpec.expectedConfig) + } + "be able to parse minimal pubsub config" in { + parse("/config.pubsub.minimal.hocon") must beRight(ConfigSpec.expectedConfig) + } + } + + private def parse(resource: String): Either[ExitCode, Config[PubSubSinkConfig]] = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, PubSubSinkConfig](Some(path)).value.unsafeRunSync() + } +} + +object ConfigSpec { + + private val expectedConfig = Config[PubSubSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 100000, + recordLimit = 40, + timeLimit = 1000 + ), + sink = PubSubSinkConfig( + maxBytes = 10000000, + googleProjectId = "google-project-id", + backoffPolicy = PubSubSinkConfig.BackoffPolicy( + minBackoff = 1000, + maxBackoff = 1000, + totalBackoff = 9223372036854L, + multiplier = 2, + initialRpcTimeout = 10000, + maxRpcTimeout = 10000, + rpcTimeoutMultiplier = 2 + ), + startupCheckInterval = 1.second, + retryInterval = 10.seconds + ) + ) + ) + +} diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubsubConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubsubConfigSpec.scala deleted file mode 100644 index 40583f94e..000000000 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubsubConfigSpec.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (c) 2014-2022 Snowplow Analytics Ltd. - * All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache - * License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. - * - * See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.collectors.scalastream - -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec - -class PubsubConfigSpec extends ConfigSpec { - makeConfigTest("pubsub", "", "") -} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala index 4fdc196c4..5f4dd8659 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -1,17 +1,16 @@ package com.snowplowanalytics.snowplow.collector.stdout -import cats.effect.Sync +import cats.effect.IO import cats.effect.kernel.Resource - import com.snowplowanalytics.snowplow.collector.core.model.Sinks import com.snowplowanalytics.snowplow.collector.core.App import com.snowplowanalytics.snowplow.collector.core.Config object StdoutCollector extends App[SinkConfig](BuildInfo) { - override def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]] = { - val good = new PrintingSink(config.sink.maxBytes, System.out) - val bad = new PrintingSink(config.sink.maxBytes, System.err) + override def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]] = { + val good = new PrintingSink[IO](config.sink.maxBytes, System.out) + val bad = new PrintingSink[IO](config.sink.maxBytes, System.err) Resource.pure(Sinks(good, bad)) } }