Skip to content

Commit

Permalink
Refactoring in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Aug 11, 2023
1 parent 77751a6 commit 294f757
Show file tree
Hide file tree
Showing 24 changed files with 487 additions and 453 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.snowplowanalytics.snowplow.collector.core

import cats.effect.{ExitCode, IO, Sync}
import cats.effect.kernel.Resource

import com.monovore.decline.effect.CommandIOApp
import com.monovore.decline.Opts

import io.circe.Decoder

import com.snowplowanalytics.snowplow.collector.core.model.Sinks

abstract class App[SinkConfig: Decoder]
extends CommandIOApp(name = "collector", header = "header", version = "version") {

def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]]

final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](mkSinks)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.snowplowanalytics.snowplow.collector.core

trait AppInfo {
def name: String
def version: String
def dockerAlias: String
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration.FiniteDuration

import io.circe.config.syntax._
import io.circe.generic.semiauto._
import io.circe.Decoder
import io.circe._

import org.http4s.SameSite

case class Config[+SinkConfig](
interface: String,
port: Int,
paths: Map[String, String],
p3p: Config.P3P,
crossDomain: Config.CrossDomain,
cookie: Config.Cookie,
doNotTrackCookie: Config.DoNotTrackCookie,
cookieBounce: Config.CookieBounce,
redirectMacro: Config.RedirectMacro,
rootResponse: Config.RootResponse,
cors: Config.CORS,
streams: Config.Streams[SinkConfig],
monitoring: Config.Monitoring,
ssl: Config.SSL,
enableDefaultRedirect: Boolean,
redirectDomains: Set[String]
)

object Config {

case class P3P(
policyRef: String,
CP: String
)

case class CrossDomain(
enabled: Boolean,
domains: List[String],
secure: Boolean
)

case class Cookie(
enabled: Boolean,
name: String,
expiration: FiniteDuration,
domains: List[String],
fallbackDomain: Option[String],
secure: Boolean,
httpOnly: Boolean,
sameSite: Option[SameSite]
)

case class DoNotTrackCookie(
enabled: Boolean,
name: String,
value: String
)

case class CookieBounce(
enabled: Boolean,
name: String,
fallbackNetworkUserId: String,
forwardedProtocolHeader: Option[String]
)

case class RedirectMacro(
enabled: Boolean,
placeholder: Option[String]
)

case class RootResponse(
enabled: Boolean,
statusCode: Int,
headers: Map[String, String],
body: String
)

case class CORS(
accessControlMaxAge: FiniteDuration
)

case class Streams[+SinkConfig](
good: String,
bad: String,
useIpAddressAsPartitionKey: Boolean,
sink: SinkConfig,
buffer: Buffer
)

//sealed trait Sinks {
// val maxBytes: Int
//}

case class Buffer(
byteLimit: Long,
recordLimit: Long,
timeLimit: Long
)

case class Monitoring(
metrics: Metrics
)

case class Metrics(
statsd: Statsd
)

case class Statsd(
enabled: Boolean,
hostname: String,
port: Int,
period: FiniteDuration,
prefix: String
)

case class SSL(
enable: Boolean,
redirect: Boolean,
port: Int
)

implicit val p3p = deriveDecoder[P3P]
implicit val crossDomain = deriveDecoder[CrossDomain]
implicit val sameSite: Decoder[SameSite] = Decoder.instance { cur =>
cur.as[String].map(_.toLowerCase) match {
case Right("none") => Right(SameSite.None)
case Right("strict") => Right(SameSite.Strict)
case Right("lax") => Right(SameSite.Lax)
case Right(other) =>
Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history))
case Left(err) => Left(err)
}
}
implicit val cookie = deriveDecoder[Cookie]
implicit val doNotTrackCookie = deriveDecoder[DoNotTrackCookie]
implicit val cookieBounce = deriveDecoder[CookieBounce]
implicit val redirectMacro = deriveDecoder[RedirectMacro]
implicit val rootResponse = deriveDecoder[RootResponse]
implicit val cors = deriveDecoder[CORS]
implicit val buffer = deriveDecoder[Buffer]
implicit val statsd = deriveDecoder[Statsd]
implicit val metrics = deriveDecoder[Metrics]
implicit val monitoring = deriveDecoder[Monitoring]
implicit val ssl = deriveDecoder[SSL]

implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = {
implicit val _ = deriveDecoder[Streams[SinkConfig]]
deriveDecoder[Config[SinkConfig]]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.snowplowanalytics.snowplow.collector.core

import java.nio.file.{Files, Path}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.typesafe.config.{Config => TypesafeConfig, ConfigFactory}

import scala.collection.JavaConverters._

import io.circe.Decoder
import io.circe.config.syntax.CirceConfigOps

import cats.implicits._
import cats.data.EitherT

import cats.effect.{ExitCode, Sync}

object ConfigParser {

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def fromPath[F[_]: Sync, SinkConfig: Decoder](
configPath: Path
): EitherT[F, ExitCode, Config[SinkConfig]] =
configFromFile[F, Config[SinkConfig]](configPath)

private def configFromFile[F[_]: Sync, A: Decoder](path: Path): EitherT[F, ExitCode, A] = {
val eitherT = for {
text <- EitherT(readTextFrom[F](path))
hocon <- EitherT.fromEither[F](hoconFromString(text))
result <- EitherT.fromEither[F](resolve(hocon))
} yield result

eitherT.leftSemiflatMap { str =>
Logger[F].error(str).as(ExitCode.Error)
}
}

private def readTextFrom[F[_]: Sync](path: Path): F[Either[String, String]] =
Sync[F].blocking {
Either
.catchNonFatal(Files.readAllLines(path).asScala.mkString("\n"))
.leftMap(e => s"Error reading ${path.toAbsolutePath} file from filesystem: ${e.getMessage}")
}

private def hoconFromString(str: String): Either[String, TypesafeConfig] =
Either.catchNonFatal(ConfigFactory.parseString(str)).leftMap(_.getMessage)

private def resolve[A: Decoder](hocon: TypesafeConfig): Either[String, A] = {
val either = for {
resolved <- Either.catchNonFatal(hocon.resolve()).leftMap(_.getMessage)
resolved <- Either.catchNonFatal(loadAll(resolved)).leftMap(_.getMessage)
parsed <- resolved.as[A].leftMap(_.show)
} yield parsed
either.leftMap(e => s"Cannot resolve config: $e")
}

private def loadAll(config: TypesafeConfig): TypesafeConfig =
namespaced(ConfigFactory.load(namespaced(config.withFallback(namespaced(ConfigFactory.load())))))

private def namespaced(config: TypesafeConfig): TypesafeConfig = {
val namespace = "collector"
if (config.hasPath(namespace))
config.getConfig(namespace).withFallback(config.withoutPath(namespace))
else
config
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.snowplowanalytics.snowplow.collector.core

import java.net.InetSocketAddress

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.DurationLong

import com.comcast.ip4s.{IpAddress, Port}

import cats.implicits._

import cats.effect.{Async, Resource}

import org.http4s.HttpApp
import org.http4s.server.Server
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.netty.server.NettyServerBuilder

import fs2.io.net.Network

object HttpServer {

implicit private def logger[F[_]: Async] = Slf4jLogger.getLogger[F]

def build[F[_]: Async](
app: HttpApp[F],
interface: String,
port: Int
): Resource[F, Server] =
sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match {
case Some("EMBER") | None => buildEmberServer[F](app, interface, port)
case Some("BLAZE") => buildBlazeServer[F](app, port)
case Some("NETTY") => buildNettyServer[F](app, port)
case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other")
}

private def buildEmberServer[F[_]: Async](
app: HttpApp[F],
interface: String,
port: Int
) = {
implicit val network = Network.forAsync[F]
Resource.eval(Logger[F].info("Building ember server")) >>
EmberServerBuilder
.default[F]
.withHost(IpAddress.fromString(interface).get)
.withPort(Port.fromInt(port).get)
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.build
}

private def buildBlazeServer[F[_]: Async](
app: HttpApp[F],
port: Int
): Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(port))
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.resource

private def buildNettyServer[F[_]: Async](
app: HttpApp[F],
port: Int
): Resource[F, Server] =
Resource.eval(Logger[F].info("Building netty server")) >>
NettyServerBuilder[F].bindLocal(port).withHttpApp(app).withIdleTimeout(610.seconds).resource
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.snowplowanalytics.snowplow.collectors.scalastream
package com.snowplowanalytics.snowplow.collector.core

import cats.implicits._
import cats.effect.Sync
Expand All @@ -8,7 +8,7 @@ import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import com.comcast.ip4s.Dns

class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDsl[F] {
class Routes[F[_]: Sync, SinkConfig](service: Service[F, SinkConfig]) extends Http4sDsl[F] {

implicit val dns: Dns[F] = Dns.forSync[F]

Expand All @@ -19,12 +19,12 @@ class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDs

private val cookieRoutes = HttpRoutes.of[F] {
case req @ POST -> Root / vendor / version =>
val path = collectorService.determinePath(vendor, version)
val path = service.determinePath(vendor, version)
val userAgent = extractHeader(req, "User-Agent")
val referer = extractHeader(req, "Referer")
val spAnonymous = extractHeader(req, "SP-Anonymous")

collectorService.cookie(
service.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
Expand Down
Loading

0 comments on commit 294f757

Please sign in to comment.