Skip to content

Commit

Permalink
Load config
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Aug 7, 2023
1 parent 21ec804 commit 1a83d7b
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 12 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ lazy val http4s = project
Dependencies.Libraries.http4sNetty,
Dependencies.Libraries.log4cats,
Dependencies.Libraries.slf4j,
Dependencies.Libraries.decline,
Dependencies.Libraries.circeGeneric,
Dependencies.Libraries.circeConfig,
Dependencies.Libraries.specs2
)
)
Expand Down
82 changes: 82 additions & 0 deletions http4s/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
collector {
paths {}

p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}

crossDomain {
enabled = false
domains = [ "*" ]
secure = true
}

cookie {
enabled = true
expiration = 365 days
name = sp
secure = true
httpOnly = true
sameSite = "None"
}

doNotTrackCookie {
enabled = false
name = ""
value = ""
}

cookieBounce {
enabled = false
name = "n3pc"
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
}

redirectMacro {
enabled = false
}

rootResponse {
enabled = false
statusCode = 302
headers = {}
body = ""
}

cors {
accessControlMaxAge = 60 minutes
}

streams {
useIpAddressAsPartitionKey = false

buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}

monitoring {
metrics {
statsd {
enabled = false
hostname = localhost
port = 8125
period = 10 seconds
prefix = snowplow.collector
}
}
}

ssl {
enable = false
redirect = false
port = 443
}

enableDefaultRedirect = false

redirectDomains = []
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ object CollectorApp {
implicit private def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

def run[F[_]: Async](mkGood: Resource[F, Sink[F]], mkBad: Resource[F, Sink[F]]): F[ExitCode] = {
def run[F[_]: Async](
args: List[String],
mkGood: Resource[F, Sink[F]],
mkBad: Resource[F, Sink[F]]
): F[ExitCode] = {
val resources = for {
config <- Resource.eval(Config.parse(args))
_ <- Resource.eval(Sync[F].delay(println(config)))
bad <- mkBad
good <- mkGood
_ <- withGracefulShutdown(610.seconds) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.nio.file.{Files, Path}

import scala.concurrent.duration.FiniteDuration
import scala.collection.JavaConverters._

import com.typesafe.config.ConfigFactory

import io.circe.config.syntax._
import io.circe.generic.semiauto._
import io.circe.Decoder
import io.circe._

import cats.implicits._

import cats.effect.Sync

import com.monovore.decline.{Command, Opts}

object Config {

def parse[F[_]: Sync](
args: List[String]
): F[Config] = {
val appConfig = Opts
.option[Path]("config", "Path to HOCON configuration", "c", "config.hocon")
val parser = Command[Path]("collector", "Snowplow application that collects tracking events")(appConfig)
for {
path <- parser.parse(args) match {
case Left(help) =>
Sync[F].raiseError(new IllegalArgumentException(s"can't read CLI arguments. $help"))
case Right(path) =>
Sync[F].pure(path)
}
config <- parseConfigFile(path)
} yield config
}

private def parseConfigFile[F[_]: Sync](path: Path): F[Config] =
Either.catchNonFatal(Files.readAllLines(path).asScala.mkString("\n")) match {
case Right(raw) =>
val config = ConfigFactory.parseString(raw).resolve()
.withFallback(ConfigFactory.load("application.conf"))
.withFallback(ConfigFactory.load("reference.conf"))
config.as[CollectorConfig] match {
case Right(parsed) =>
Sync[F].pure(parsed.collector)
case Left(err) =>
Sync[F].raiseError(new IllegalArgumentException(s"can't parse config file $path. Error: $err"))
}
case Left(err) =>
Sync[F].raiseError(new IllegalArgumentException(s"can't read content of file $path. Error: $err"))
}

final case class CollectorConfig(
collector: Config
)
final case class P3P(
policyRef: String,
CP: String
)
final case class CrossDomain(
enabled: Boolean,
domains: List[String],
secure: Boolean
)
final case class Cookie(
enabled: Boolean,
name: String,
expiration: FiniteDuration,
domains: Option[List[String]],
fallbackDomain: Option[String],
secure: Boolean,
httpOnly: Boolean,
sameSite: Option[String]
)
final case class DoNotTrackCookie(
enabled: Boolean,
name: String,
value: String
)
final case class CookieBounce(
enabled: Boolean,
name: String,
fallbackNetworkUserId: String,
forwardedProtocolHeader: Option[String]
)
final case class RedirectMacro(
enabled: Boolean,
placeholder: Option[String]
)
final case class RootResponse(
enabled: Boolean,
statusCode: Int,
headers: Map[String, String],
body: String
)
final case class CORS(
accessControlMaxAge: FiniteDuration
)
final case class Streams(
good: String,
bad: String,
useIpAddressAsPartitionKey: Boolean,
sink: Sink,
buffer: Buffer
)
sealed trait Sink {
val maxBytes: Int
}
object Sink {
final case class Stdout(
maxBytes: Int
) extends Sink
}
final case class Buffer(
byteLimit: Long,
recordLimit: Long,
timeLimit: Long
)
final case class Monitoring(
metrics: Metrics
)
final case class Metrics(
statsd: Statsd
)
final case class Statsd(
enabled: Boolean,
hostname: String,
port: Int,
period: FiniteDuration,
prefix: String
)
final case class SSL(
enable: Boolean,
redirect: Boolean,
port: Int
)
implicit val p3pDecoder: Decoder[P3P] = deriveDecoder[P3P]
implicit val crossDomainDecoder: Decoder[CrossDomain] = deriveDecoder[CrossDomain]
implicit val cookieDecoder: Decoder[Cookie] = deriveDecoder[Cookie]
implicit val doNotTrackCookieDecoder: Decoder[DoNotTrackCookie] = deriveDecoder[DoNotTrackCookie]
implicit val cookieBounceDecoder: Decoder[CookieBounce] = deriveDecoder[CookieBounce]
implicit val redirectMacroDecoder: Decoder[RedirectMacro] = deriveDecoder[RedirectMacro]
implicit val rootResponseDecoder: Decoder[RootResponse] = deriveDecoder[RootResponse]
implicit val corsDecoder: Decoder[CORS] = deriveDecoder[CORS]
implicit val bufferDecoder: Decoder[Buffer] = deriveDecoder[Buffer]
implicit val stdoutDecoder: Decoder[Sink.Stdout] = deriveDecoder[Sink.Stdout]
implicit val sinkDecoder: Decoder[Sink] =
Decoder.instance { cur =>
val sinkType = cur.downField("enabled")
sinkType.as[String].map(_.toLowerCase) match {
case Right("stdout") =>
cur.as[Sink.Stdout]
case Right(other) =>
Left(DecodingFailure(s"Enabled sink type $other is not supported", sinkType.history))
case Left(DecodingFailure(_, List(CursorOp.DownField("enabled")))) =>
Left(DecodingFailure("Cannot find 'enabled' string in sink configuration", sinkType.history))
case Left(other) =>
Left(other)
}
}
implicit val streamsDecoder: Decoder[Streams] = deriveDecoder[Streams]
implicit val statsdDecoder: Decoder[Statsd] = deriveDecoder[Statsd]
implicit val metricsDecoder: Decoder[Metrics] = deriveDecoder[Metrics]
implicit val monitoringDecoder: Decoder[Monitoring] = deriveDecoder[Monitoring]
implicit val sslDecoder: Decoder[SSL] = deriveDecoder[SSL]
implicit val configDecoder: Decoder[Config] = deriveDecoder[Config]
implicit val collectorConfigDecoder: Decoder[CollectorConfig] = deriveDecoder[CollectorConfig]
}

final case class Config(
interface: String,
port: Int,
paths: Map[String, String],
p3p: Config.P3P,
crossDomain: Config.CrossDomain,
cookie: Config.Cookie,
doNotTrackCookie: Config.DoNotTrackCookie,
cookieBounce: Config.CookieBounce,
redirectMacro: Config.RedirectMacro,
rootResponse: Config.RootResponse,
cors: Config.CORS,
streams: Config.Streams,
monitoring: Config.Monitoring,
ssl: Config.SSL,
enableDefaultRedirect: Boolean,
redirectDomains: Set[String]
)
25 changes: 15 additions & 10 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ object Dependencies {
val akkaHttpMetrics = "1.7.1"
val badRows = "2.1.1"
val log4cats = "2.6.0"
val http4s = "0.23.23"
val blaze = "0.23.15"
val http4sNetty = "0.5.9"
val decline = "2.4.1"
val circe = "0.14.1"
val circeConfig = "0.10.0"
// Scala (test only)
val specs2 = "4.11.0"
val specs2CE = "0.4.1"
val testcontainers = "0.40.10"
val catsRetry = "2.1.0"
val http4s = "0.23.23"
val blaze = "0.23.15"
val http4sNetty = "0.5.9"
val http4sIT = "0.21.33"
}

Expand Down Expand Up @@ -92,13 +95,15 @@ object Dependencies {
val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics
val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats


//http4s
val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s
val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s
val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze
val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty

// http4s
val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s
val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s
val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze
val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty
val decline = "com.monovore" %% "decline" % V.decline
val circeGeneric = "io.circe" %% "circe-generic" % V.circe
val circeConfig = "io.circe" %% "circe-config" % V.circeConfig

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ object StdoutCollector extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val good = Resource.pure[IO, Sink[IO]](printingSink(System.out))
val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err))
CollectorApp.run[IO](good, bad)
CollectorApp.run[IO](
args,
good,
bad
)
}

private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] {
Expand Down

0 comments on commit 1a83d7b

Please sign in to comment.