diff --git a/build.sbt b/build.sbt index 23ea94c6c..1f9fc452b 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ * governing permissions and limitations there under. */ import com.typesafe.sbt.packager.docker._ -import sbtbuildinfo.BuildInfoPlugin.autoImport.buildInfoPackage +import sbtbuildinfo.BuildInfoPlugin.autoImport._ lazy val commonDependencies = Seq( // Java @@ -90,7 +90,7 @@ lazy val buildSettings = Seq( name := "snowplow-stream-collector", description := "Scala Stream Collector for Snowplow raw events", scalaVersion := "2.12.10", - scalacOptions ++= Seq("-Ypartial-unification"), + scalacOptions ++= Seq("-Ypartial-unification", "-Ywarn-macros:after"), javacOptions := Seq("-source", "11", "-target", "11"), resolvers ++= Dependencies.resolutionRepos ) @@ -100,6 +100,11 @@ lazy val dynVerSettings = Seq( ThisBuild / dynverSeparator := "-" // to be compatible with docker ) +lazy val http4sBuildInfoSettings = Seq( + buildInfoKeys := Seq[BuildInfoKey](name, dockerAlias, version), + buildInfoOptions += BuildInfoOption.Traits("com.snowplowanalytics.snowplow.collector.core.AppInfo") +) + lazy val allSettings = buildSettings ++ BuildSettings.sbtAssemblySettings ++ BuildSettings.formatting ++ @@ -265,13 +270,15 @@ lazy val nsqDistroless = project .dependsOn(core % "test->test;compile->compile") lazy val stdoutSettings = - allSettings ++ buildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-stdout", - Docker / packageName := "scala-stream-collector-stdout" + Docker / packageName := "scala-stream-collector-stdout", + ) lazy val stdout = project .settings(stdoutSettings) + .settings(buildInfoPackage := s"com.snowplowanalytics.snowplow.collector.stdout") .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) .dependsOn(http4s % "test->test;compile->compile") diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index 01beb4493..9edecc7c0 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -10,10 +10,18 @@ import io.circe.Decoder import com.snowplowanalytics.snowplow.collector.core.model.Sinks -abstract class App[SinkConfig: Decoder] - extends CommandIOApp(name = "collector", header = "header", version = "version") { +abstract class App[SinkConfig: Decoder](appInfo: AppInfo) + extends CommandIOApp( + name = App.helpCommand(appInfo), + header = "Snowplow application that collects tracking events", + version = appInfo.version + ) { def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]] - final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](mkSinks) + final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks) +} + +object App { + private def helpCommand(appInfo: AppInfo) = s"docker run ${appInfo.dockerAlias}" } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index f50cad604..838e8efcd 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -89,9 +89,9 @@ object Config { buffer: Buffer ) - //sealed trait Sinks { - // val maxBytes: Int - //} + trait Sink { + val maxBytes: Int + } case class Buffer( byteLimit: Long, @@ -121,32 +121,31 @@ object Config { port: Int ) - implicit val p3p = deriveDecoder[P3P] - implicit val crossDomain = deriveDecoder[CrossDomain] - implicit val sameSite: Decoder[SameSite] = Decoder.instance { cur => - cur.as[String].map(_.toLowerCase) match { - case Right("none") => Right(SameSite.None) - case Right("strict") => Right(SameSite.Strict) - case Right("lax") => Right(SameSite.Lax) - case Right(other) => - Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history)) - case Left(err) => Left(err) - } - } - implicit val cookie = deriveDecoder[Cookie] - implicit val doNotTrackCookie = deriveDecoder[DoNotTrackCookie] - implicit val cookieBounce = deriveDecoder[CookieBounce] - implicit val redirectMacro = deriveDecoder[RedirectMacro] - implicit val rootResponse = deriveDecoder[RootResponse] - implicit val cors = deriveDecoder[CORS] - implicit val buffer = deriveDecoder[Buffer] - implicit val statsd = deriveDecoder[Statsd] - implicit val metrics = deriveDecoder[Metrics] - implicit val monitoring = deriveDecoder[Monitoring] - implicit val ssl = deriveDecoder[SSL] - implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = { - implicit val _ = deriveDecoder[Streams[SinkConfig]] + implicit val p3p = deriveDecoder[P3P] + implicit val crossDomain = deriveDecoder[CrossDomain] + implicit val sameSite: Decoder[SameSite] = Decoder.instance { cur => + cur.as[String].map(_.toLowerCase) match { + case Right("none") => Right(SameSite.None) + case Right("strict") => Right(SameSite.Strict) + case Right("lax") => Right(SameSite.Lax) + case Right(other) => + Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history)) + case Left(err) => Left(err) + } + } + implicit val cookie = deriveDecoder[Cookie] + implicit val doNotTrackCookie = deriveDecoder[DoNotTrackCookie] + implicit val cookieBounce = deriveDecoder[CookieBounce] + implicit val redirectMacro = deriveDecoder[RedirectMacro] + implicit val rootResponse = deriveDecoder[RootResponse] + implicit val cors = deriveDecoder[CORS] + implicit val buffer = deriveDecoder[Buffer] + implicit val streams = deriveDecoder[Streams[SinkConfig]] + implicit val statsd = deriveDecoder[Statsd] + implicit val metrics = deriveDecoder[Metrics] + implicit val monitoring = deriveDecoder[Monitoring] + implicit val ssl = deriveDecoder[SSL] deriveDecoder[Config[SinkConfig]] } } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index 3b6978b78..bac32c14f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -8,7 +8,7 @@ import org.http4s.dsl.Http4sDsl import org.http4s.implicits._ import com.comcast.ip4s.Dns -class Routes[F[_]: Sync, SinkConfig](service: Service[F, SinkConfig]) extends Http4sDsl[F] { +class Routes[F[_]: Sync](service: IService[F]) extends Http4sDsl[F] { implicit val dns: Dns[F] = Dns.forSync[F] diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index c95e38416..33b7c1597 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -5,16 +5,14 @@ import java.nio.file.Path import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -//import scala.concurrent.duration.{DurationLong, FiniteDuration} -//import scala.collection.JavaConverters._ +import scala.concurrent.duration.{DurationLong, FiniteDuration} import cats.implicits._ import cats.data.EitherT -import cats.effect.{ExitCode, Sync} +import cats.effect.{Async, ExitCode, Sync} import cats.effect.kernel.Resource -//import com.monovore.decline.effect.CommandIOApp import com.monovore.decline.Opts import io.circe.Decoder @@ -25,20 +23,22 @@ object Run { implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - def fromCli[F[_]: Sync, SinkConfig: Decoder]( + def fromCli[F[_]: Async, SinkConfig: Decoder]( + appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]] ): Opts[F[ExitCode]] = { - val configPathOpt = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon") //.orNone - configPathOpt.map(fromPath[F, SinkConfig](mkSinks, _)) + val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon") //.orNone + configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, _)) } - private def fromPath[F[_]: Sync, SinkConfig: Decoder]( + private def fromPath[F[_]: Async, SinkConfig: Decoder]( + appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], path: Path ): F[ExitCode] = { val eitherT = for { config <- ConfigParser.fromPath[F, SinkConfig](path) - _ <- EitherT.right[ExitCode](fromConfig(mkSinks, config)) + _ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config)) } yield ExitCode.Success eitherT.merge.handleErrorWith { e => @@ -47,27 +47,30 @@ object Run { } } - private def fromConfig[F[_]: Sync, SinkConfig]( + private def fromConfig[F[_]: Async, SinkConfig]( + appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], config: Config[SinkConfig] - ): F[ExitCode] = ??? - //private def fromConfig[F[_]: Async, SourceConfig, SinkConfig]( - // appInfo: AppInfo, - // toSource: SourceConfig => SourceAndAck[F], - // toBadSink: SinkConfig => Resource[F, Sink[F]], - // config: Config.WithIglu[SourceConfig, SinkConfig] - //): F[ExitCode] = - // Environment.fromConfig(config, appInfo, toSource, toBadSink).use { env => - // Processing - // .stream(env) - // .concurrently(Telemetry.stream(config.main.telemetry, env)) - // .concurrently(env.metrics.report) - // .compile - // .drain - // .as(ExitCode.Success) - // } - - def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = { + ): F[ExitCode] = { + val resources = for { + sinks <- mkSinks(config.streams) + collectorService = new Service[F, SinkConfig]( + config, + Sinks(sinks.good, sinks.bad), + appInfo + ) + httpServer = HttpServer.build[F]( + new Routes[F](collectorService).value, + config.interface, + config.port + ) + _ <- withGracefulShutdown(610.seconds)(httpServer) + } yield () + + resources.surround(Async[F].never[ExitCode]) + } + + private def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = { def logCause(e: Throwable): F[Unit] = Option(e.getCause) match { @@ -77,40 +80,16 @@ object Run { Logger[F].error(e.getMessage) >> logCause(e) } -} -// def run[F[_]: Async]( -// args: List[String], -// appName: String, -// appVersion: String, -// mkSinks: Config.Streams => Resource[F, Sinks[F]] -// ): F[ExitCode] = { -// val resources = for { -// config <- Resource.eval(Config.parse(args)) -// sinks <- mkSinks(config.streams) -// _ <- withGracefulShutdown(610.seconds) { -// val collectorService: CollectorService[F] = new CollectorService[F]( -// config, -// Sinks(sinks.good, sinks.bad), -// appName, -// appVersion -// ) -// buildHttpServer[F](new CollectorRoutes[F](collectorService).value) -// } -// } yield () -// -// resources.surround(Async[F].never[ExitCode]) -// } -// -// private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] = -// for { -// a <- resource -// _ <- Resource.onFinalizeCase { -// case Resource.ExitCase.Canceled => -// Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >> -// Async[F].sleep(delay) -// case _ => -// Async[F].unit -// } -// } yield a -// + private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] = + for { + a <- resource + _ <- Resource.onFinalizeCase { + case Resource.ExitCase.Canceled => + Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >> + Async[F].sleep(delay) + case _ => + Async[F].unit + } + } yield a +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index decdf09c0..5cc87a59f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -41,14 +41,12 @@ trait IService[F[_]] { class Service[F[_]: Sync, SinkConfig]( config: Config[SinkConfig], sinks: Sinks[F], - appName: String, - appVersion: String + appInfo: AppInfo ) extends IService[F] { - // TODO: Add sink type as well - private val collector = s"$appName-$appVersion" + private val collector = s"${appInfo.name}:${appInfo.version}" - private val splitBatch: SplitBatch = SplitBatch(appName, appVersion) + private val splitBatch: SplitBatch = SplitBatch(appInfo) def cookie( queryString: Option[String], diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala index 492cd1e77..f7114be0e 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPa import com.snowplowanalytics.snowplow.collector.core.model._ /** Object handling splitting an array of strings correctly */ -case class SplitBatch(appName: String, appVersion: String) { +case class SplitBatch(appInfo: AppInfo) { // Serialize Thrift CollectorPayload objects val ThriftSerializer = new ThreadLocal[TSerializer] { @@ -124,7 +124,7 @@ case class SplitBatch(appName: String, appVersion: String) { ): Array[Byte] = BadRow .SizeViolation( - Processor(appName, appVersion), + Processor(appInfo.name, appInfo.version), Failure.SizeViolation(Instant.now(), maxSize, size, s"oversized collector payload: $msg"), Payload.RawPayload(event.toString().take(maxSize / 10)) ) diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorTestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorTestUtils.scala deleted file mode 100644 index 14e770969..000000000 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorTestUtils.scala +++ /dev/null @@ -1,13 +0,0 @@ -package com.snowplowanalytics.snowplow.collector.core - -import cats.Applicative - -object CollectorTestUtils { - - def noopSink[F[_]: Applicative]: Sink[F] = new Sink[F] { - val maxBytes: Int = Int.MaxValue - def isHealthy: F[Boolean] = Applicative[F].pure(true) - def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Applicative[F].unit - } - -} diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorRoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala similarity index 91% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorRoutesSpec.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala index fe916188a..f98956912 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorRoutesSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala @@ -1,16 +1,19 @@ package com.snowplowanalytics.snowplow.collector.core +import org.specs2.mutable.Specification + import cats.effect.IO import cats.effect.unsafe.implicits.global + import org.http4s.implicits.http4sLiteralsSyntax import org.http4s.{Method, Request, RequestCookie, Response, Status} import org.http4s.Status._ + import fs2.{Stream, text} -import org.specs2.mutable.Specification -class CollectorRoutesSpec extends Specification { +class RoutesSpec extends Specification { - val collectorService = new Service[IO] { + val service = new IService[IO] { override def cookie( queryString: Option[String], body: IO[Option[String]], @@ -30,7 +33,7 @@ class CollectorRoutesSpec extends Specification { override def determinePath(vendor: String, version: String): String = "/p1/p2" } - val routes = new CollectorRoutes[IO](collectorService).value + val routes = new Routes(service).value "The collector route" should { "respond to the health route with an ok response" in { diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala similarity index 95% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorServiceSpec.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala index 1b09febb7..0caed3af7 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorServiceSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala @@ -22,16 +22,15 @@ import org.http4s.implicits._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.model._ -class CollectorServiceSpec extends Specification { - case class ProbeService(service: CollectorService[IO], good: TestSink, bad: TestSink) +class ServiceSpec extends Specification { + case class ProbeService(service: Service[IO, Any], good: TestSink, bad: TestSink) - val service = new CollectorService[IO]( - config = TestUtils.testConfig, - sinks = Sinks[IO](new TestSink, new TestSink), - appName = "appName", - appVersion = "appVersion" + val service = new Service( + config = TestUtils.testConfig, + sinks = Sinks(new TestSink, new TestSink), + appInfo = TestUtils.appInfo ) val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r @@ -39,11 +38,10 @@ class CollectorServiceSpec extends Specification { def probeService(): ProbeService = { val good = new TestSink val bad = new TestSink - val service = new CollectorService[IO]( - config = TestUtils.testConfig, - sinks = Sinks[IO](good, bad), - appName = "appName", - appVersion = "appVersion" + val service = new Service( + config = TestUtils.testConfig, + sinks = Sinks(good, bad), + appInfo = TestUtils.appInfo ) ProbeService(service, good, bad) } @@ -114,7 +112,7 @@ class CollectorServiceSpec extends Specification { e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" e.ipAddress shouldEqual "ip" e.encoding shouldEqual "UTF-8" - e.collector shouldEqual s"appName-appVersion" + e.collector shouldEqual s"${TestUtils.appName}:${TestUtils.appVersion}" e.querystring shouldEqual "a=b" e.body shouldEqual "b" e.path shouldEqual "p" @@ -192,7 +190,7 @@ class CollectorServiceSpec extends Specification { e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" e.ipAddress shouldEqual "ip" e.encoding shouldEqual "UTF-8" - e.collector shouldEqual s"appName-appVersion" + e.collector shouldEqual s"${TestUtils.appName}:${TestUtils.appVersion}" e.querystring shouldEqual "q" e.body shouldEqual "b" e.path shouldEqual "p" @@ -221,7 +219,7 @@ class CollectorServiceSpec extends Specification { e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" e.ipAddress shouldEqual "ip" e.encoding shouldEqual "UTF-8" - e.collector shouldEqual s"appName-appVersion" + e.collector shouldEqual s"${TestUtils.appName}:${TestUtils.appVersion}" e.querystring shouldEqual null e.body shouldEqual null e.path shouldEqual "p" @@ -523,11 +521,10 @@ class CollectorServiceSpec extends Specification { } "should pass on the original path if no mapping for it can be found" in { - val service = new CollectorService( + val service = new Service( TestUtils.testConfig.copy(paths = Map.empty[String, String]), Sinks(new TestSink, new TestSink), - "", - "" + TestUtils.appInfo ) val expected1 = "/com.acme/track" val expected2 = "/com.acme/redirect" diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala index 1fdc18122..ef734ec1b 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala @@ -8,14 +8,17 @@ import io.circe.syntax._ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.core.SelfDescribingData + import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload + import com.snowplowanalytics.snowplow.badrows._ -import com.snowplowanalytics.snowplow.collectors.scalastream.model.SplitBatchResult + +import com.snowplowanalytics.snowplow.collector.core.model.SplitBatchResult import org.specs2.mutable.Specification class SplitBatchSpec extends Specification { - val splitBatch: SplitBatch = SplitBatch("app", "version") + val splitBatch: SplitBatch = SplitBatch(TestUtils.appInfo) "SplitBatch.split" should { "Batch a list of strings based on size" in { @@ -70,7 +73,7 @@ class SplitBatchSpec extends Specification { sizeViolation.failure.actualSizeBytes must_== 1019 sizeViolation.failure.expectation must_== "oversized collector payload: GET requests cannot be split" sizeViolation.payload.event must_== "CollectorP" - sizeViolation.processor shouldEqual Processor("app", "version") + sizeViolation.processor shouldEqual Processor(TestUtils.appName, TestUtils.appVersion) actual.good must_== Nil } @@ -89,7 +92,7 @@ class SplitBatchSpec extends Specification { .failure .expectation must_== "oversized collector payload: cannot split POST requests which are not json expected json value got 'ssssss...' (line 1, column 1)" sizeViolation.payload.event must_== "CollectorP" - sizeViolation.processor shouldEqual Processor("app", "version") + sizeViolation.processor shouldEqual Processor(TestUtils.appName, TestUtils.appVersion) } "Reject an oversized POST CollectorPayload which would be oversized even without its body" in { @@ -118,7 +121,7 @@ class SplitBatchSpec extends Specification { sizeViolation .payload .event must_== "CollectorPayload(schema:null, ipAddress:null, timestamp:0, encoding:null, collector:null, path:ppppp" - sizeViolation.processor shouldEqual Processor("app", "version") + sizeViolation.processor shouldEqual Processor(TestUtils.appName, TestUtils.appVersion) } "Split a CollectorPayload with three large events and four very large events" in { diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index c1430c959..8b9acdd9b 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -2,13 +2,29 @@ package com.snowplowanalytics.snowplow.collector.core import scala.concurrent.duration._ +import cats.Applicative + import org.http4s.SameSite -import com.snowplowanalytics.snowplow.collectors.scalastream.Config._ +import com.snowplowanalytics.snowplow.collector.core.Config._ object TestUtils { + val appName = "collector-test" + val appVersion = "testVersion" + + val appInfo = new AppInfo { + def name = appName + def version = appVersion + def dockerAlias = "docker run collector" + } + + def noopSink[F[_]: Applicative]: Sink[F] = new Sink[F] { + val maxBytes: Int = Int.MaxValue + def isHealthy: F[Boolean] = Applicative[F].pure(true) + def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Applicative[F].unit + } - val testConfig = Config( + val testConfig = Config[Any]( interface = "0.0.0.0", port = 8080, paths = Map( @@ -61,7 +77,7 @@ object TestUtils { "raw", "bad-1", false, - Sink.Stdout(1000000000), + AnyRef, Buffer( 3145728, 500, diff --git a/stdout/src/main/resources/application.conf b/stdout/src/main/resources/application.conf index 6636da3dc..570541343 100644 --- a/stdout/src/main/resources/application.conf +++ b/stdout/src/main/resources/application.conf @@ -1,38 +1,7 @@ collector { streams { - useIpAddressAsPartitionKey = false - sink { - enabled = stdout maxBytes = 1000000000 } - - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 - } - } -} - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off } -} +} \ No newline at end of file diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala index effe73f97..83abb72a5 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala @@ -7,6 +7,8 @@ import cats.implicits._ import cats.effect.Sync +import com.snowplowanalytics.snowplow.collector.core.Sink + class PrintingSink[F[_]: Sync]( maxByteS: Int, stream: PrintStream diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala index 458c0ee8c..59e16e209 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala @@ -3,6 +3,8 @@ package com.snowplowanalytics.snowplow.collector.stdout import io.circe.Decoder import io.circe.generic.semiauto._ +import com.snowplowanalytics.snowplow.collector.core.Config + final case class SinkConfig( maxBytes: Int ) extends Config.Sink diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala index 0995a7d42..4fdc196c4 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -1,15 +1,17 @@ package com.snowplowanalytics.snowplow.collector.stdout -import cats.effect.{ExitCode, IO, IOApp, Sync} +import cats.effect.Sync import cats.effect.kernel.Resource import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.App +import com.snowplowanalytics.snowplow.collector.core.Config -object StdoutCollector extends CollectorApp[SinkConfig] { +object StdoutCollector extends App[SinkConfig](BuildInfo) { override def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]] = { - val good = new PrintingSink(maxBytes, System.out) - val bad = new PrintingSink(maxBytes, System.err) + val good = new PrintingSink(config.sink.maxBytes, System.out) + val bad = new PrintingSink(config.sink.maxBytes, System.err) Resource.pure(Sinks(good, bad)) } }