From 7b35c9910c4e5bd2dc8ce42fddeb5edf55f902eb Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Mon, 7 Aug 2023 20:18:36 +0200 Subject: [PATCH] Load config --- build.sbt | 3 + http4s/src/main/resources/reference.conf | 82 +++++ .../CollectorApp.scala | 35 ++- .../CollectorService.scala | 7 +- .../Config.scala | 293 ++++++++++++++++++ .../model.scala | 6 +- .../CollectorServiceSpec.scala | 12 +- .../TestUtils.scala | 80 ++++- project/Dependencies.scala | 25 +- .../PrintingSink.scala | 28 +- .../StdoutCollector.scala | 40 +-- .../sinks/PrintingSinkSpec.scala | 26 +- 12 files changed, 535 insertions(+), 102 deletions(-) create mode 100644 http4s/src/main/resources/reference.conf create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala diff --git a/build.sbt b/build.sbt index 78112e2bf..23ea94c6c 100644 --- a/build.sbt +++ b/build.sbt @@ -134,6 +134,9 @@ lazy val http4s = project Dependencies.Libraries.badRows, Dependencies.Libraries.collectorPayload, Dependencies.Libraries.slf4j, + Dependencies.Libraries.decline, + Dependencies.Libraries.circeGeneric, + Dependencies.Libraries.circeConfig, Dependencies.Libraries.specs2 ) ) diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf new file mode 100644 index 000000000..32db682a7 --- /dev/null +++ b/http4s/src/main/resources/reference.conf @@ -0,0 +1,82 @@ +collector { + paths {} + + p3p { + policyRef = "/w3c/p3p.xml" + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + } + + crossDomain { + enabled = false + domains = [ "*" ] + secure = true + } + + cookie { + enabled = true + expiration = 365 days + name = sp + secure = true + httpOnly = true + sameSite = "None" + } + + doNotTrackCookie { + enabled = false + name = "" + value = "" + } + + cookieBounce { + enabled = false + name = "n3pc" + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000" + } + + redirectMacro { + enabled = false + } + + rootResponse { + enabled = false + statusCode = 302 + headers = {} + body = "" + } + + cors { + accessControlMaxAge = 60 minutes + } + + streams { + useIpAddressAsPartitionKey = false + + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } + + monitoring { + metrics { + statsd { + enabled = false + hostname = localhost + port = 8125 + period = 10 seconds + prefix = snowplow.collector + } + } + } + + ssl { + enable = false + redirect = false + port = 443 + } + + enableDefaultRedirect = false + + redirectDomains = [] +} \ No newline at end of file diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala index 82074116d..a4f9f2292 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala @@ -1,20 +1,26 @@ package com.snowplowanalytics.snowplow.collectors.scalastream +import java.net.InetSocketAddress + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.concurrent.duration.{DurationLong, FiniteDuration} + import cats.implicits._ + import cats.effect.{Async, ExitCode, Sync} import cats.effect.kernel.Resource + import fs2.io.net.Network + import com.comcast.ip4s.IpLiteralSyntax + import org.http4s.HttpApp import org.http4s.server.Server import org.http4s.ember.server.EmberServerBuilder import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.netty.server.NettyServerBuilder -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - -import java.net.InetSocketAddress -import scala.concurrent.duration.{DurationLong, FiniteDuration} import com.snowplowanalytics.snowplow.collectors.scalastream.model._ @@ -24,18 +30,21 @@ object CollectorApp { Slf4jLogger.getLogger[F] def run[F[_]: Async]( - mkGood: Resource[F, Sink[F]], - mkBad: Resource[F, Sink[F]], - config: CollectorConfig, + args: List[String], appName: String, - appVersion: String + appVersion: String, + mkSinks: Config.Streams => Resource[F, Sinks[F]] ): F[ExitCode] = { val resources = for { - bad <- mkBad - good <- mkGood + config <- Resource.eval(Config.parse(args)) + sinks <- mkSinks(config.streams) _ <- withGracefulShutdown(610.seconds) { - val sinks = CollectorSinks(good, bad) - val collectorService: CollectorService[F] = new CollectorService[F](config, sinks, appName, appVersion) + val collectorService: CollectorService[F] = new CollectorService[F]( + config, + Sinks(sinks.good, sinks.bad), + appName, + appVersion + ) buildHttpServer[F](new CollectorRoutes[F](collectorService).value) } } yield () diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index e652e0c49..7b93e98cb 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -36,8 +36,8 @@ trait Service[F[_]] { } class CollectorService[F[_]: Sync]( - config: CollectorConfig, - sinks: CollectorSinks[F], + config: Config, + sinks: Sinks[F], appName: String, appVersion: String ) extends Service[F] { @@ -65,8 +65,7 @@ class CollectorService[F[_]: Sync]( for { body <- body hostname <- hostname - // TODO: Get ipAsPartitionKey from config - (ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false) + (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey) // TODO: nuid should be set properly nuid = UUID.randomUUID().toString event = buildEvent( diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala new file mode 100644 index 000000000..7f3bcb7d2 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala @@ -0,0 +1,293 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import java.nio.file.{Files, Path} + +import scala.concurrent.duration.FiniteDuration +import scala.collection.JavaConverters._ + +import com.typesafe.config.ConfigFactory + +import io.circe.config.syntax._ +import io.circe.generic.semiauto._ +import io.circe.Decoder +import io.circe._ + +import cats.implicits._ + +import cats.effect.Sync + +import com.monovore.decline.{Command, Opts} + +object Config { + + def parse[F[_]: Sync]( + args: List[String] + ): F[Config] = { + val appConfig = Opts.option[Path]("config", "Path to HOCON configuration", "c", "config.hocon") + val parser = Command[Path]("collector", "Snowplow application that collects tracking events")(appConfig) + for { + path <- parser.parse(args) match { + case Left(help) => + Sync[F].raiseError(new IllegalArgumentException(s"can't read CLI arguments. $help")) + case Right(path) => + Sync[F].pure(path) + } + config <- parseConfigFile(path) + } yield config + } + + private def parseConfigFile[F[_]: Sync](path: Path): F[Config] = + Either.catchNonFatal(Files.readAllLines(path).asScala.mkString("\n")) match { + case Right(raw) => + val config = ConfigFactory + .parseString(raw) + .resolve() + .withFallback(ConfigFactory.load("application.conf")) + .withFallback(ConfigFactory.load("reference.conf")) + config.as[CollectorConfig] match { + case Right(parsed) => + Sync[F].pure(parsed.collector) + case Left(err) => + Sync[F].raiseError(new IllegalArgumentException(s"can't parse config file $path. Error: $err")) + } + case Left(err) => + Sync[F].raiseError(new IllegalArgumentException(s"can't read content of file $path. Error: $err")) + } + + final case class CollectorConfig( + collector: Config + ) + final case class P3P( + policyRef: String, + CP: String + ) + final case class CrossDomain( + enabled: Boolean, + domains: List[String], + secure: Boolean + ) + final case class Cookie( + enabled: Boolean, + name: String, + expiration: FiniteDuration, + domains: Option[List[String]], + fallbackDomain: Option[String], + secure: Boolean, + httpOnly: Boolean, + sameSite: Option[String] + ) + final case class DoNotTrackCookie( + enabled: Boolean, + name: String, + value: String + ) + final case class CookieBounce( + enabled: Boolean, + name: String, + fallbackNetworkUserId: String, + forwardedProtocolHeader: Option[String] + ) + final case class RedirectMacro( + enabled: Boolean, + placeholder: Option[String] + ) + final case class RootResponse( + enabled: Boolean, + statusCode: Int, + headers: Map[String, String], + body: String + ) + final case class CORS( + accessControlMaxAge: FiniteDuration + ) + final case class Streams( + good: String, + bad: String, + useIpAddressAsPartitionKey: Boolean, + sink: Sink, + buffer: Buffer + ) + sealed trait Sink { + val maxBytes: Int + } + object Sink { + final case class Kinesis( + maxBytes: Int, + region: String, + threadPoolSize: Int, + aws: AWSCreds, + backoffPolicy: BackoffPolicy, + customEndpoint: Option[String], + sqsGoodBuffer: Option[String], + sqsBadBuffer: Option[String], + sqsMaxBytes: Int, + startupCheckInterval: FiniteDuration + ) extends Sink { + val endpoint = customEndpoint.getOrElse(region match { + case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn" + case cn @ "cn-northwest-1" => s"https://kinesis.$cn.amazonaws.com.cn" + case _ => s"https://kinesis.$region.amazonaws.com" + }) + } + final case class Sqs( + maxBytes: Int, + region: String, + threadPoolSize: Int, + aws: AWSCreds, + backoffPolicy: BackoffPolicy, + startupCheckInterval: FiniteDuration + ) extends Sink + final case class PubSub( + maxBytes: Int, + googleProjectId: String, + backoffPolicy: PubSubBackoffPolicy, + startupCheckInterval: FiniteDuration, + retryInterval: FiniteDuration + ) extends Sink + final case class Kafka( + maxBytes: Int, + brokers: String, + retries: Int, + producerConf: Option[Map[String, String]] + ) extends Sink + final case class Nsq( + maxBytes: Int, + host: String, + port: Int + ) extends Sink + final case class Rabbitmq( + maxBytes: Int, + username: String, + password: String, + virtualHost: String, + host: String, + port: Int, + backoffPolicy: RabbitMQBackoffPolicy, + routingKeyGood: String, + routingKeyBad: String, + threadPoolSize: Option[Int] + ) extends Sink + final case class Stdout( + maxBytes: Int + ) extends Sink + final case class AWSCreds( + accessKey: String, + secretKey: String + ) + final case class BackoffPolicy( + minBackoff: Long, + maxBackoff: Long, + maxRetries: Int + ) + final case class PubSubBackoffPolicy( + minBackoff: Long, + maxBackoff: Long, + totalBackoff: Long, + multiplier: Double, + initialRpcTimeout: Long, + maxRpcTimeout: Long, + rpcTimeoutMultiplier: Double + ) + final case class RabbitMQBackoffPolicy( + minBackoff: Long, + maxBackoff: Long, + multiplier: Double + ) + } + final case class Buffer( + byteLimit: Long, + recordLimit: Long, + timeLimit: Long + ) + final case class Monitoring( + metrics: Metrics + ) + final case class Metrics( + statsd: Statsd + ) + final case class Statsd( + enabled: Boolean, + hostname: String, + port: Int, + period: FiniteDuration, + prefix: String + ) + final case class SSL( + enable: Boolean, + redirect: Boolean, + port: Int + ) + implicit val p3pDecoder: Decoder[P3P] = deriveDecoder[P3P] + implicit val crossDomainDecoder: Decoder[CrossDomain] = deriveDecoder[CrossDomain] + implicit val cookieDecoder: Decoder[Cookie] = deriveDecoder[Cookie] + implicit val doNotTrackCookieDecoder: Decoder[DoNotTrackCookie] = deriveDecoder[DoNotTrackCookie] + implicit val cookieBounceDecoder: Decoder[CookieBounce] = deriveDecoder[CookieBounce] + implicit val redirectMacroDecoder: Decoder[RedirectMacro] = deriveDecoder[RedirectMacro] + implicit val rootResponseDecoder: Decoder[RootResponse] = deriveDecoder[RootResponse] + implicit val corsDecoder: Decoder[CORS] = deriveDecoder[CORS] + implicit val bufferDecoder: Decoder[Buffer] = deriveDecoder[Buffer] + implicit val awsCredsDecoder: Decoder[Sink.AWSCreds] = deriveDecoder[Sink.AWSCreds] + implicit val backoffPolicyDecoder: Decoder[Sink.BackoffPolicy] = deriveDecoder[Sink.BackoffPolicy] + implicit val kinesisDecoder: Decoder[Sink.Kinesis] = deriveDecoder[Sink.Kinesis] + implicit val sqsDecoder: Decoder[Sink.Sqs] = deriveDecoder[Sink.Sqs] + implicit val pubSubBackoffPolicyDecoder: Decoder[Sink.PubSubBackoffPolicy] = deriveDecoder[Sink.PubSubBackoffPolicy] + implicit val pubSubDecoder: Decoder[Sink.PubSub] = deriveDecoder[Sink.PubSub] + implicit val kafkaDecoder: Decoder[Sink.Kafka] = deriveDecoder[Sink.Kafka] + implicit val nsqDecoder: Decoder[Sink.Nsq] = deriveDecoder[Sink.Nsq] + implicit val rabbitMQBackoffPolicyDecoder: Decoder[Sink.RabbitMQBackoffPolicy] = + deriveDecoder[Sink.RabbitMQBackoffPolicy] + implicit val rabbitMQDecoder: Decoder[Sink.Rabbitmq] = deriveDecoder[Sink.Rabbitmq] + implicit val stdoutDecoder: Decoder[Sink.Stdout] = deriveDecoder[Sink.Stdout] + implicit val sinkDecoder: Decoder[Sink] = + Decoder.instance { cur => + val sinkType = cur.downField("enabled") + sinkType.as[String].map(_.toLowerCase) match { + case Right("kinesis") => + cur.as[Sink.Kinesis] + case Right("sqs") => + cur.as[Sink.Sqs] + case Right("google-pub-sub") => + cur.as[Sink.PubSub] + case Right("rabbitmq") => + cur.as[Sink.Rabbitmq] + case Right("nsq") => + cur.as[Sink.Nsq] + case Right("kafka") => + cur.as[Sink.Stdout] + case Right("stdout") => + cur.as[Sink.Stdout] + case Right(other) => + Left(DecodingFailure(s"Enabled sink type $other is not supported", sinkType.history)) + case Left(DecodingFailure(_, List(CursorOp.DownField("enabled")))) => + Left(DecodingFailure("Cannot find 'enabled' string in sink configuration", sinkType.history)) + case Left(other) => + Left(other) + } + } + implicit val streamsDecoder: Decoder[Streams] = deriveDecoder[Streams] + implicit val statsdDecoder: Decoder[Statsd] = deriveDecoder[Statsd] + implicit val metricsDecoder: Decoder[Metrics] = deriveDecoder[Metrics] + implicit val monitoringDecoder: Decoder[Monitoring] = deriveDecoder[Monitoring] + implicit val sslDecoder: Decoder[SSL] = deriveDecoder[SSL] + implicit val configDecoder: Decoder[Config] = deriveDecoder[Config] + implicit val collectorConfigDecoder: Decoder[CollectorConfig] = deriveDecoder[CollectorConfig] +} + +final case class Config( + interface: String, + port: Int, + paths: Map[String, String], + p3p: Config.P3P, + crossDomain: Config.CrossDomain, + cookie: Config.Cookie, + doNotTrackCookie: Config.DoNotTrackCookie, + cookieBounce: Config.CookieBounce, + redirectMacro: Config.RedirectMacro, + rootResponse: Config.RootResponse, + cors: Config.CORS, + streams: Config.Streams, + monitoring: Config.Monitoring, + ssl: Config.SSL, + enableDefaultRedirect: Boolean, + redirectDomains: Set[String] +) diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index 24a99ae9e..66ab01fbe 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -8,7 +8,7 @@ object model { * Case class for holding both good and * bad sinks for the Stream Collector. */ - final case class CollectorSinks[F[_]](good: Sink[F], bad: Sink[F]) + final case class Sinks[F[_]](good: Sink[F], bad: Sink[F]) /** * Case class for holding the results of @@ -26,8 +26,4 @@ object model { * @param failedBigEvents List of events that were too large */ final case class SplitBatchResult(goodBatches: List[List[Json]], failedBigEvents: List[Json]) - - final case class CollectorConfig( - paths: Map[String, String] - ) } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index 92b2aa483..556d56165 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -15,8 +15,8 @@ class CollectorServiceSpec extends Specification { case class ProbeService(service: CollectorService[IO], good: TestSink, bad: TestSink) val service = new CollectorService[IO]( - config = TestUtils.testConf, - sinks = CollectorSinks[IO](new TestSink, new TestSink), + config = TestUtils.testConfig, + sinks = Sinks[IO](new TestSink, new TestSink), appName = "appName", appVersion = "appVersion" ) @@ -27,8 +27,8 @@ class CollectorServiceSpec extends Specification { val good = new TestSink val bad = new TestSink val service = new CollectorService[IO]( - config = TestUtils.testConf, - sinks = CollectorSinks[IO](good, bad), + config = TestUtils.testConfig, + sinks = Sinks[IO](good, bad), appName = "appName", appVersion = "appVersion" ) @@ -247,8 +247,8 @@ class CollectorServiceSpec extends Specification { "should pass on the original path if no mapping for it can be found" in { val service = new CollectorService( - TestUtils.testConf.copy(paths = Map.empty[String, String]), - CollectorSinks(new TestSink, new TestSink), + TestUtils.testConfig.copy(paths = Map.empty[String, String]), + Sinks(new TestSink, new TestSink), "", "" ) diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala index f0adaf65a..90282e499 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala @@ -1,14 +1,88 @@ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorConfig +import scala.concurrent.duration._ + +import com.snowplowanalytics.snowplow.collectors.scalastream.Config._ object TestUtils { - val testConf = CollectorConfig( + val testConfig = Config( + interface = "0.0.0.0", + port = 8080, paths = Map( "/com.acme/track" -> "/com.snowplowanalytics.snowplow/tp2", "/com.acme/redirect" -> "/r/tp2", "/com.acme/iglu" -> "/com.snowplowanalytics.iglu/v1" - ) + ), + p3p = P3P( + "/w3c/p3p.xml", + "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = CrossDomain( + false, + List("*"), + true + ), + cookie = Cookie( + true, + "sp", + 365.days, + None, + None, + true, + true, + Some("None") + ), + doNotTrackCookie = DoNotTrackCookie( + false, + "", + "" + ), + cookieBounce = CookieBounce( + false, + "n3pc", + "00000000-0000-4000-A000-000000000000", + None + ), + redirectMacro = RedirectMacro( + false, + None + ), + rootResponse = RootResponse( + false, + 302, + Map.empty[String, String], + "" + ), + cors = CORS(60.minutes), + streams = Streams( + "raw", + "bad-1", + false, + Sink.Stdout(1000000000), + Buffer( + 3145728, + 500, + 5000 + ) + ), + monitoring = Monitoring( + Metrics( + Statsd( + false, + "localhost", + 8125, + 10.seconds, + "snowplow.collector" + ) + ) + ), + ssl = SSL( + false, + false, + 443 + ), + enableDefaultRedirect = false, + redirectDomains = Set.empty[String] ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6f225e087..11c846652 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -48,14 +48,17 @@ object Dependencies { val akkaHttpMetrics = "1.7.1" val badRows = "2.1.1" val log4cats = "2.6.0" + val http4s = "0.23.23" + val blaze = "0.23.15" + val http4sNetty = "0.5.9" + val decline = "2.4.1" + val circe = "0.13.0" + val circeConfig = "0.8.0" // force circe 0.13.0 // Scala (test only) val specs2 = "4.11.0" val specs2CE = "0.4.1" val testcontainers = "0.40.10" val catsRetry = "2.1.0" - val http4s = "0.23.23" - val blaze = "0.23.15" - val http4sNetty = "0.5.9" val http4sIT = "0.21.33" } @@ -92,13 +95,15 @@ object Dependencies { val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats - - //http4s - val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s - val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s - val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze - val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty - + // http4s + val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s + val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s + val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze + val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val decline = "com.monovore" %% "decline" % V.decline + val circeGeneric = "io.circe" %% "circe-generic" % V.circe + val circeConfig = "io.circe" %% "circe-config" % V.circeConfig + // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala index e6117809d..661839903 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala @@ -1,29 +1,19 @@ -/* - * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ package com.snowplowanalytics.snowplow.collectors.scalastream -import cats.effect.Sync -import cats.implicits._ - import java.io.PrintStream import java.util.Base64 -class PrintingSink[F[_]: Sync](stream: PrintStream) extends Sink[F] { +import cats.implicits._ + +import cats.effect.Sync + +class PrintingSink[F[_]: Sync]( + maxByteS: Int, + stream: PrintStream +) extends Sink[F] { private val encoder: Base64.Encoder = Base64.getEncoder.withoutPadding() - override val maxBytes: Int = Int.MaxValue // TODO: configurable? + override val maxBytes: Int = maxByteS override def isHealthy: F[Boolean] = Sync[F].pure(true) override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index 7035382d8..e97b8983f 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -1,35 +1,29 @@ -/* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ package com.snowplowanalytics.snowplow.collectors.scalastream +import cats.effect.{ExitCode, IO, IOApp, Sync} import cats.effect.kernel.Resource -import cats.effect.{ExitCode, IO, IOApp} + +import com.snowplowanalytics.snowplow.collectors.scalastream.Config.Sink.Stdout import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo import com.snowplowanalytics.snowplow.collectors.scalastream.model._ object StdoutCollector extends IOApp { - def run(args: List[String]): IO[ExitCode] = { - val good = Resource.pure[IO, Sink[IO]](new PrintingSink[IO](System.out)) - val bad = Resource.pure[IO, Sink[IO]](new PrintingSink[IO](System.err)) + def run(args: List[String]): IO[ExitCode] = CollectorApp.run[IO]( - good, - bad, - CollectorConfig(Map.empty), + args, BuildInfo.shortName, - BuildInfo.version + BuildInfo.version, + mkSinks ) - } + + def mkSinks[F[_]: Sync](config: Config.Streams): Resource[F, Sinks[F]] = + config.sink match { + case Stdout(maxBytes) => + val good = new PrintingSink(maxBytes, System.out) + val bad = new PrintingSink(maxBytes, System.err) + Resource.pure(Sinks(good, bad)) + case other => + Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"sink $other is not stdout"))) + } } diff --git a/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala b/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala index 359006c92..a7c6a69be 100644 --- a/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala +++ b/stdout/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PrintingSinkSpec.scala @@ -1,33 +1,21 @@ -/* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +import org.specs2.mutable.Specification + import cats.effect.IO import cats.effect.unsafe.implicits.global -import com.snowplowanalytics.snowplow.collectors.scalastream.PrintingSink -import org.specs2.mutable.Specification -import java.io.{ByteArrayOutputStream, PrintStream} -import java.nio.charset.StandardCharsets +import com.snowplowanalytics.snowplow.collectors.scalastream.PrintingSink class PrintingSinkSpec extends Specification { "Printing sink" should { "print provided bytes encoded as BASE64 string" in { val baos = new ByteArrayOutputStream() - val sink = new PrintingSink[IO](new PrintStream(baos)) + val sink = new PrintingSink[IO](Integer.MAX_VALUE, new PrintStream(baos)) val input = "Something" sink.storeRawEvents(List(input.getBytes(StandardCharsets.UTF_8)), "key").unsafeRunSync()