diff --git a/build.sbt b/build.sbt index 20198ca56..6d65b6564 100644 --- a/build.sbt +++ b/build.sbt @@ -131,6 +131,9 @@ lazy val http4s = project Dependencies.Libraries.http4sNetty, Dependencies.Libraries.log4cats, Dependencies.Libraries.slf4j, + Dependencies.Libraries.decline, + Dependencies.Libraries.circeGeneric, + Dependencies.Libraries.circeConfig, Dependencies.Libraries.specs2 ) ) diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf new file mode 100644 index 000000000..3461d06db --- /dev/null +++ b/http4s/src/main/resources/reference.conf @@ -0,0 +1,82 @@ +collector { + paths {} + + p3p { + policyRef = "/w3c/p3p.xml" + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + } + + crossDomain { + enabled = false + domains = [ "*" ] + secure = true + } + + cookie { + enabled = true + expiration = 365 days + name = sp + secure = true + httpOnly = true + sameSite = "None" + } + + doNotTrackCookie { + enabled = false + name = "" + value = "" + } + + cookieBounce { + enabled = false + name = "n3pc" + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000" + } + + redirectMacro { + enabled = false + } + + rootResponse { + enabled = false + statusCode = 302 + headers = {} + body = "" + } + + cors { + accessControlMaxAge = 60 minutes + } + + streams { + useIpAddressAsPartitionKey = false + + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } + + monitoring { + metrics { + statsd { + enabled = false + hostname = localhost + port = 8125 + period = 10 seconds + prefix = snowplow.collector + } + } + } + + ssl { + enable = false + redirect = false + port = 443 + } + + enableDefaultRedirect = false + + redirectDomains = [] +} \ No newline at end of file diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala index 3e7f82e36..60ed15f18 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala @@ -21,8 +21,14 @@ object CollectorApp { implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - def run[F[_]: Async](mkGood: Resource[F, Sink[F]], mkBad: Resource[F, Sink[F]]): F[ExitCode] = { + def run[F[_]: Async]( + args: List[String], + mkGood: Resource[F, Sink[F]], + mkBad: Resource[F, Sink[F]] + ): F[ExitCode] = { val resources = for { + config <- Resource.eval(Config.parse(args)) + _ <- Resource.eval(Sync[F].delay(println(config))) bad <- mkBad good <- mkGood _ <- withGracefulShutdown(610.seconds) { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala new file mode 100644 index 000000000..208ff332d --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala @@ -0,0 +1,190 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import java.nio.file.{Files, Path} + +import scala.concurrent.duration.FiniteDuration +import scala.collection.JavaConverters._ + +import com.typesafe.config.ConfigFactory + +import io.circe.config.syntax._ +import io.circe.generic.semiauto._ +import io.circe.Decoder +import io.circe._ + +import cats.implicits._ + +import cats.effect.Sync + +import com.monovore.decline.{Command, Opts} + +object Config { + + def parse[F[_]: Sync]( + args: List[String] + ): F[Config] = { + val appConfig = Opts + .option[Path]("config", "Path to HOCON configuration", "c", "config.hocon") + val parser = Command[Path]("collector", "Snowplow application that collects tracking events")(appConfig) + for { + path <- parser.parse(args) match { + case Left(help) => + Sync[F].raiseError(new IllegalArgumentException(s"can't read CLI arguments. $help")) + case Right(path) => + Sync[F].pure(path) + } + config <- parseConfigFile(path) + } yield config + } + + private def parseConfigFile[F[_]: Sync](path: Path): F[Config] = + Either.catchNonFatal(Files.readAllLines(path).asScala.mkString("\n")) match { + case Right(raw) => + val config = ConfigFactory.parseString(raw).resolve() + .withFallback(ConfigFactory.load("application.conf")) + .withFallback(ConfigFactory.load("reference.conf")) + config.as[CollectorConfig] match { + case Right(parsed) => + Sync[F].pure(parsed.collector) + case Left(err) => + Sync[F].raiseError(new IllegalArgumentException(s"can't parse config file $path. Error: $err")) + } + case Left(err) => + Sync[F].raiseError(new IllegalArgumentException(s"can't read content of file $path. Error: $err")) + } + + final case class CollectorConfig( + collector: Config + ) + final case class P3P( + policyRef: String, + CP: String + ) + final case class CrossDomain( + enabled: Boolean, + domains: List[String], + secure: Boolean + ) + final case class Cookie( + enabled: Boolean, + name: String, + expiration: FiniteDuration, + domains: Option[List[String]], + fallbackDomain: Option[String], + secure: Boolean, + httpOnly: Boolean, + sameSite: Option[String] + ) + final case class DoNotTrackCookie( + enabled: Boolean, + name: String, + value: String + ) + final case class CookieBounce( + enabled: Boolean, + name: String, + fallbackNetworkUserId: String, + forwardedProtocolHeader: Option[String] + ) + final case class RedirectMacro( + enabled: Boolean, + placeholder: Option[String] + ) + final case class RootResponse( + enabled: Boolean, + statusCode: Int, + headers: Map[String, String], + body: String + ) + final case class CORS( + accessControlMaxAge: FiniteDuration + ) + final case class Streams( + good: String, + bad: String, + useIpAddressAsPartitionKey: Boolean, + sink: Sink, + buffer: Buffer + ) + sealed trait Sink { + val maxBytes: Int + } + object Sink { + final case class Stdout( + maxBytes: Int + ) extends Sink + } + final case class Buffer( + byteLimit: Long, + recordLimit: Long, + timeLimit: Long + ) + final case class Monitoring( + metrics: Metrics + ) + final case class Metrics( + statsd: Statsd + ) + final case class Statsd( + enabled: Boolean, + hostname: String, + port: Int, + period: FiniteDuration, + prefix: String + ) + final case class SSL( + enable: Boolean, + redirect: Boolean, + port: Int + ) + implicit val p3pDecoder: Decoder[P3P] = deriveDecoder[P3P] + implicit val crossDomainDecoder: Decoder[CrossDomain] = deriveDecoder[CrossDomain] + implicit val cookieDecoder: Decoder[Cookie] = deriveDecoder[Cookie] + implicit val doNotTrackCookieDecoder: Decoder[DoNotTrackCookie] = deriveDecoder[DoNotTrackCookie] + implicit val cookieBounceDecoder: Decoder[CookieBounce] = deriveDecoder[CookieBounce] + implicit val redirectMacroDecoder: Decoder[RedirectMacro] = deriveDecoder[RedirectMacro] + implicit val rootResponseDecoder: Decoder[RootResponse] = deriveDecoder[RootResponse] + implicit val corsDecoder: Decoder[CORS] = deriveDecoder[CORS] + implicit val bufferDecoder: Decoder[Buffer] = deriveDecoder[Buffer] + implicit val stdoutDecoder: Decoder[Sink.Stdout] = deriveDecoder[Sink.Stdout] + implicit val sinkDecoder: Decoder[Sink] = + Decoder.instance { cur => + val sinkType = cur.downField("enabled") + sinkType.as[String].map(_.toLowerCase) match { + case Right("stdout") => + cur.as[Sink.Stdout] + case Right(other) => + Left(DecodingFailure(s"Enabled sink type $other is not supported", sinkType.history)) + case Left(DecodingFailure(_, List(CursorOp.DownField("enabled")))) => + Left(DecodingFailure("Cannot find 'enabled' string in sink configuration", sinkType.history)) + case Left(other) => + Left(other) + } + } + implicit val streamsDecoder: Decoder[Streams] = deriveDecoder[Streams] + implicit val statsdDecoder: Decoder[Statsd] = deriveDecoder[Statsd] + implicit val metricsDecoder: Decoder[Metrics] = deriveDecoder[Metrics] + implicit val monitoringDecoder: Decoder[Monitoring] = deriveDecoder[Monitoring] + implicit val sslDecoder: Decoder[SSL] = deriveDecoder[SSL] + implicit val configDecoder: Decoder[Config] = deriveDecoder[Config] + implicit val collectorConfigDecoder: Decoder[CollectorConfig] = deriveDecoder[CollectorConfig] +} + +final case class Config( + interface: String, + port: Int, + paths: Map[String, String], + p3p: Config.P3P, + crossDomain: Config.CrossDomain, + cookie: Config.Cookie, + doNotTrackCookie: Config.DoNotTrackCookie, + cookieBounce: Config.CookieBounce, + redirectMacro: Config.RedirectMacro, + rootResponse: Config.RootResponse, + cors: Config.CORS, + streams: Config.Streams, + monitoring: Config.Monitoring, + ssl: Config.SSL, + enableDefaultRedirect: Boolean, + redirectDomains: Set[String] +) \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6f225e087..664a153d2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -48,14 +48,17 @@ object Dependencies { val akkaHttpMetrics = "1.7.1" val badRows = "2.1.1" val log4cats = "2.6.0" + val http4s = "0.23.23" + val blaze = "0.23.15" + val http4sNetty = "0.5.9" + val decline = "2.4.1" + val circe = "0.14.1" + val circeConfig = "0.10.0" // Scala (test only) val specs2 = "4.11.0" val specs2CE = "0.4.1" val testcontainers = "0.40.10" val catsRetry = "2.1.0" - val http4s = "0.23.23" - val blaze = "0.23.15" - val http4sNetty = "0.5.9" val http4sIT = "0.21.33" } @@ -92,13 +95,15 @@ object Dependencies { val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats - - //http4s - val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s - val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s - val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze - val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty - + // http4s + val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s + val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s + val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze + val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val decline = "com.monovore" %% "decline" % V.decline + val circeGeneric = "io.circe" %% "circe-generic" % V.circe + val circeConfig = "io.circe" %% "circe-config" % V.circeConfig + // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index 62cc51ac7..8bd54d977 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -26,7 +26,11 @@ object StdoutCollector extends IOApp { def run(args: List[String]): IO[ExitCode] = { val good = Resource.pure[IO, Sink[IO]](printingSink(System.out)) val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err)) - CollectorApp.run[IO](good, bad) + CollectorApp.run[IO]( + args, + good, + bad + ) } private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] {