diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala new file mode 100644 index 000000000..01beb4493 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -0,0 +1,19 @@ +package com.snowplowanalytics.snowplow.collector.core + +import cats.effect.{ExitCode, IO, Sync} +import cats.effect.kernel.Resource + +import com.monovore.decline.effect.CommandIOApp +import com.monovore.decline.Opts + +import io.circe.Decoder + +import com.snowplowanalytics.snowplow.collector.core.model.Sinks + +abstract class App[SinkConfig: Decoder] + extends CommandIOApp(name = "collector", header = "header", version = "version") { + + def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]] + + final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](mkSinks) +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala new file mode 100644 index 000000000..1215a8149 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/AppInfo.scala @@ -0,0 +1,7 @@ +package com.snowplowanalytics.snowplow.collector.core + +trait AppInfo { + def name: String + def version: String + def dockerAlias: String +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala new file mode 100644 index 000000000..f50cad604 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -0,0 +1,152 @@ +package com.snowplowanalytics.snowplow.collector.core + +import scala.concurrent.duration.FiniteDuration + +import io.circe.config.syntax._ +import io.circe.generic.semiauto._ +import io.circe.Decoder +import io.circe._ + +import org.http4s.SameSite + +case class Config[+SinkConfig]( + interface: String, + port: Int, + paths: Map[String, String], + p3p: Config.P3P, + crossDomain: Config.CrossDomain, + cookie: Config.Cookie, + doNotTrackCookie: Config.DoNotTrackCookie, + cookieBounce: Config.CookieBounce, + redirectMacro: Config.RedirectMacro, + rootResponse: Config.RootResponse, + cors: Config.CORS, + streams: Config.Streams[SinkConfig], + monitoring: Config.Monitoring, + ssl: Config.SSL, + enableDefaultRedirect: Boolean, + redirectDomains: Set[String] +) + +object Config { + + case class P3P( + policyRef: String, + CP: String + ) + + case class CrossDomain( + enabled: Boolean, + domains: List[String], + secure: Boolean + ) + + case class Cookie( + enabled: Boolean, + name: String, + expiration: FiniteDuration, + domains: List[String], + fallbackDomain: Option[String], + secure: Boolean, + httpOnly: Boolean, + sameSite: Option[SameSite] + ) + + case class DoNotTrackCookie( + enabled: Boolean, + name: String, + value: String + ) + + case class CookieBounce( + enabled: Boolean, + name: String, + fallbackNetworkUserId: String, + forwardedProtocolHeader: Option[String] + ) + + case class RedirectMacro( + enabled: Boolean, + placeholder: Option[String] + ) + + case class RootResponse( + enabled: Boolean, + statusCode: Int, + headers: Map[String, String], + body: String + ) + + case class CORS( + accessControlMaxAge: FiniteDuration + ) + + case class Streams[+SinkConfig]( + good: String, + bad: String, + useIpAddressAsPartitionKey: Boolean, + sink: SinkConfig, + buffer: Buffer + ) + + //sealed trait Sinks { + // val maxBytes: Int + //} + + case class Buffer( + byteLimit: Long, + recordLimit: Long, + timeLimit: Long + ) + + case class Monitoring( + metrics: Metrics + ) + + case class Metrics( + statsd: Statsd + ) + + case class Statsd( + enabled: Boolean, + hostname: String, + port: Int, + period: FiniteDuration, + prefix: String + ) + + case class SSL( + enable: Boolean, + redirect: Boolean, + port: Int + ) + + implicit val p3p = deriveDecoder[P3P] + implicit val crossDomain = deriveDecoder[CrossDomain] + implicit val sameSite: Decoder[SameSite] = Decoder.instance { cur => + cur.as[String].map(_.toLowerCase) match { + case Right("none") => Right(SameSite.None) + case Right("strict") => Right(SameSite.Strict) + case Right("lax") => Right(SameSite.Lax) + case Right(other) => + Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history)) + case Left(err) => Left(err) + } + } + implicit val cookie = deriveDecoder[Cookie] + implicit val doNotTrackCookie = deriveDecoder[DoNotTrackCookie] + implicit val cookieBounce = deriveDecoder[CookieBounce] + implicit val redirectMacro = deriveDecoder[RedirectMacro] + implicit val rootResponse = deriveDecoder[RootResponse] + implicit val cors = deriveDecoder[CORS] + implicit val buffer = deriveDecoder[Buffer] + implicit val statsd = deriveDecoder[Statsd] + implicit val metrics = deriveDecoder[Metrics] + implicit val monitoring = deriveDecoder[Monitoring] + implicit val ssl = deriveDecoder[SSL] + + implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = { + implicit val _ = deriveDecoder[Streams[SinkConfig]] + deriveDecoder[Config[SinkConfig]] + } +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala new file mode 100644 index 000000000..f96026339 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParser.scala @@ -0,0 +1,71 @@ +package com.snowplowanalytics.snowplow.collector.core + +import java.nio.file.{Files, Path} + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import com.typesafe.config.{Config => TypesafeConfig, ConfigFactory} + +import scala.collection.JavaConverters._ + +import io.circe.Decoder +import io.circe.config.syntax.CirceConfigOps + +import cats.implicits._ +import cats.data.EitherT + +import cats.effect.{ExitCode, Sync} + +object ConfigParser { + + implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + + def fromPath[F[_]: Sync, SinkConfig: Decoder]( + configPath: Path + ): EitherT[F, ExitCode, Config[SinkConfig]] = + configFromFile[F, Config[SinkConfig]](configPath) + + private def configFromFile[F[_]: Sync, A: Decoder](path: Path): EitherT[F, ExitCode, A] = { + val eitherT = for { + text <- EitherT(readTextFrom[F](path)) + hocon <- EitherT.fromEither[F](hoconFromString(text)) + result <- EitherT.fromEither[F](resolve(hocon)) + } yield result + + eitherT.leftSemiflatMap { str => + Logger[F].error(str).as(ExitCode.Error) + } + } + + private def readTextFrom[F[_]: Sync](path: Path): F[Either[String, String]] = + Sync[F].blocking { + Either + .catchNonFatal(Files.readAllLines(path).asScala.mkString("\n")) + .leftMap(e => s"Error reading ${path.toAbsolutePath} file from filesystem: ${e.getMessage}") + } + + private def hoconFromString(str: String): Either[String, TypesafeConfig] = + Either.catchNonFatal(ConfigFactory.parseString(str)).leftMap(_.getMessage) + + private def resolve[A: Decoder](hocon: TypesafeConfig): Either[String, A] = { + val either = for { + resolved <- Either.catchNonFatal(hocon.resolve()).leftMap(_.getMessage) + resolved <- Either.catchNonFatal(loadAll(resolved)).leftMap(_.getMessage) + parsed <- resolved.as[A].leftMap(_.show) + } yield parsed + either.leftMap(e => s"Cannot resolve config: $e") + } + + private def loadAll(config: TypesafeConfig): TypesafeConfig = + namespaced(ConfigFactory.load(namespaced(config.withFallback(namespaced(ConfigFactory.load()))))) + + private def namespaced(config: TypesafeConfig): TypesafeConfig = { + val namespace = "collector" + if (config.hasPath(namespace)) + config.getConfig(namespace).withFallback(config.withoutPath(namespace)) + else + config + } + +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala new file mode 100644 index 000000000..cfbc2ebe5 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -0,0 +1,73 @@ +package com.snowplowanalytics.snowplow.collector.core + +import java.net.InetSocketAddress + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.concurrent.duration.DurationLong + +import com.comcast.ip4s.{IpAddress, Port} + +import cats.implicits._ + +import cats.effect.{Async, Resource} + +import org.http4s.HttpApp +import org.http4s.server.Server +import org.http4s.ember.server.EmberServerBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.netty.server.NettyServerBuilder + +import fs2.io.net.Network + +object HttpServer { + + implicit private def logger[F[_]: Async] = Slf4jLogger.getLogger[F] + + def build[F[_]: Async]( + app: HttpApp[F], + interface: String, + port: Int + ): Resource[F, Server] = + sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match { + case Some("EMBER") | None => buildEmberServer[F](app, interface, port) + case Some("BLAZE") => buildBlazeServer[F](app, port) + case Some("NETTY") => buildNettyServer[F](app, port) + case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other") + } + + private def buildEmberServer[F[_]: Async]( + app: HttpApp[F], + interface: String, + port: Int + ) = { + implicit val network = Network.forAsync[F] + Resource.eval(Logger[F].info("Building ember server")) >> + EmberServerBuilder + .default[F] + .withHost(IpAddress.fromString(interface).get) + .withPort(Port.fromInt(port).get) + .withHttpApp(app) + .withIdleTimeout(610.seconds) + .build + } + + private def buildBlazeServer[F[_]: Async]( + app: HttpApp[F], + port: Int + ): Resource[F, Server] = + Resource.eval(Logger[F].info("Building blaze server")) >> + BlazeServerBuilder[F] + .bindSocketAddress(new InetSocketAddress(port)) + .withHttpApp(app) + .withIdleTimeout(610.seconds) + .resource + + private def buildNettyServer[F[_]: Async]( + app: HttpApp[F], + port: Int + ): Resource[F, Server] = + Resource.eval(Logger[F].info("Building netty server")) >> + NettyServerBuilder[F].bindLocal(port).withHttpApp(app).withIdleTimeout(610.seconds).resource +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala similarity index 85% rename from http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala rename to http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index 01870a242..3b6978b78 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import cats.implicits._ import cats.effect.Sync @@ -8,7 +8,7 @@ import org.http4s.dsl.Http4sDsl import org.http4s.implicits._ import com.comcast.ip4s.Dns -class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDsl[F] { +class Routes[F[_]: Sync, SinkConfig](service: Service[F, SinkConfig]) extends Http4sDsl[F] { implicit val dns: Dns[F] = Dns.forSync[F] @@ -19,12 +19,12 @@ class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDs private val cookieRoutes = HttpRoutes.of[F] { case req @ POST -> Root / vendor / version => - val path = collectorService.determinePath(vendor, version) + val path = service.determinePath(vendor, version) val userAgent = extractHeader(req, "User-Agent") val referer = extractHeader(req, "Referer") val spAnonymous = extractHeader(req, "SP-Anonymous") - collectorService.cookie( + service.cookie( queryString = Some(req.queryString), body = req.bodyText.compile.string.map(Some(_)), path = path, diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala new file mode 100644 index 000000000..c95e38416 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -0,0 +1,116 @@ +package com.snowplowanalytics.snowplow.collector.core + +import java.nio.file.Path + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +//import scala.concurrent.duration.{DurationLong, FiniteDuration} +//import scala.collection.JavaConverters._ + +import cats.implicits._ +import cats.data.EitherT + +import cats.effect.{ExitCode, Sync} +import cats.effect.kernel.Resource + +//import com.monovore.decline.effect.CommandIOApp +import com.monovore.decline.Opts + +import io.circe.Decoder + +import com.snowplowanalytics.snowplow.collector.core.model.Sinks + +object Run { + + implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + + def fromCli[F[_]: Sync, SinkConfig: Decoder]( + mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]] + ): Opts[F[ExitCode]] = { + val configPathOpt = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon") //.orNone + configPathOpt.map(fromPath[F, SinkConfig](mkSinks, _)) + } + + private def fromPath[F[_]: Sync, SinkConfig: Decoder]( + mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], + path: Path + ): F[ExitCode] = { + val eitherT = for { + config <- ConfigParser.fromPath[F, SinkConfig](path) + _ <- EitherT.right[ExitCode](fromConfig(mkSinks, config)) + } yield ExitCode.Success + + eitherT.merge.handleErrorWith { e => + Logger[F].error(e)("Exiting") >> + prettyLogException(e).as(ExitCode.Error) + } + } + + private def fromConfig[F[_]: Sync, SinkConfig]( + mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], + config: Config[SinkConfig] + ): F[ExitCode] = ??? + //private def fromConfig[F[_]: Async, SourceConfig, SinkConfig]( + // appInfo: AppInfo, + // toSource: SourceConfig => SourceAndAck[F], + // toBadSink: SinkConfig => Resource[F, Sink[F]], + // config: Config.WithIglu[SourceConfig, SinkConfig] + //): F[ExitCode] = + // Environment.fromConfig(config, appInfo, toSource, toBadSink).use { env => + // Processing + // .stream(env) + // .concurrently(Telemetry.stream(config.main.telemetry, env)) + // .concurrently(env.metrics.report) + // .compile + // .drain + // .as(ExitCode.Success) + // } + + def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = { + + def logCause(e: Throwable): F[Unit] = + Option(e.getCause) match { + case Some(e) => Logger[F].error(s"caused by: ${e.getMessage}") >> logCause(e) + case None => Sync[F].unit + } + + Logger[F].error(e.getMessage) >> logCause(e) + } +} + +// def run[F[_]: Async]( +// args: List[String], +// appName: String, +// appVersion: String, +// mkSinks: Config.Streams => Resource[F, Sinks[F]] +// ): F[ExitCode] = { +// val resources = for { +// config <- Resource.eval(Config.parse(args)) +// sinks <- mkSinks(config.streams) +// _ <- withGracefulShutdown(610.seconds) { +// val collectorService: CollectorService[F] = new CollectorService[F]( +// config, +// Sinks(sinks.good, sinks.bad), +// appName, +// appVersion +// ) +// buildHttpServer[F](new CollectorRoutes[F](collectorService).value) +// } +// } yield () +// +// resources.surround(Async[F].never[ExitCode]) +// } +// +// private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] = +// for { +// a <- resource +// _ <- Resource.onFinalizeCase { +// case Resource.ExitCase.Canceled => +// Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >> +// Async[F].sleep(delay) +// case _ => +// Async[F].unit +// } +// } yield a +// diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala similarity index 97% rename from http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala rename to http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index a61ac7e0b..decdf09c0 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import java.util.UUID @@ -17,9 +17,9 @@ import org.typelevel.ci._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.model._ -trait Service[F[_]] { +trait IService[F[_]] { def cookie( queryString: Option[String], body: F[Option[String]], @@ -38,12 +38,12 @@ trait Service[F[_]] { def determinePath(vendor: String, version: String): String } -class CollectorService[F[_]: Sync]( - config: Config, +class Service[F[_]: Sync, SinkConfig]( + config: Config[SinkConfig], sinks: Sinks[F], appName: String, appVersion: String -) extends Service[F] { +) extends IService[F] { // TODO: Add sink type as well private val collector = s"$appName-$appVersion" diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Sink.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Sink.scala similarity index 81% rename from http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Sink.scala rename to http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Sink.scala index 8cdc85935..5a5c7d05b 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Sink.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Sink.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core trait Sink[F[_]] { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatch.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala similarity index 97% rename from http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatch.scala rename to http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala index 907adcc51..492cd1e77 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatch.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatch.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import java.nio.ByteBuffer import java.nio.charset.StandardCharsets.UTF_8 @@ -14,7 +14,7 @@ import com.snowplowanalytics.iglu.core._ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.model._ /** Object handling splitting an array of strings correctly */ case class SplitBatch(appName: String, appVersion: String) { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/model.scala similarity index 92% rename from http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala rename to http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/model.scala index 66ab01fbe..1a998715f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/model.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import io.circe.Json diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala deleted file mode 100644 index a4f9f2292..000000000 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala +++ /dev/null @@ -1,98 +0,0 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream - -import java.net.InetSocketAddress - -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - -import scala.concurrent.duration.{DurationLong, FiniteDuration} - -import cats.implicits._ - -import cats.effect.{Async, ExitCode, Sync} -import cats.effect.kernel.Resource - -import fs2.io.net.Network - -import com.comcast.ip4s.IpLiteralSyntax - -import org.http4s.HttpApp -import org.http4s.server.Server -import org.http4s.ember.server.EmberServerBuilder -import org.http4s.blaze.server.BlazeServerBuilder -import org.http4s.netty.server.NettyServerBuilder - -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ - -object CollectorApp { - - implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = - Slf4jLogger.getLogger[F] - - def run[F[_]: Async]( - args: List[String], - appName: String, - appVersion: String, - mkSinks: Config.Streams => Resource[F, Sinks[F]] - ): F[ExitCode] = { - val resources = for { - config <- Resource.eval(Config.parse(args)) - sinks <- mkSinks(config.streams) - _ <- withGracefulShutdown(610.seconds) { - val collectorService: CollectorService[F] = new CollectorService[F]( - config, - Sinks(sinks.good, sinks.bad), - appName, - appVersion - ) - buildHttpServer[F](new CollectorRoutes[F](collectorService).value) - } - } yield () - - resources.surround(Async[F].never[ExitCode]) - } - - private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] = - for { - a <- resource - _ <- Resource.onFinalizeCase { - case Resource.ExitCase.Canceled => - Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >> - Async[F].sleep(delay) - case _ => - Async[F].unit - } - } yield a - - private def buildHttpServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] = - sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match { - case Some("EMBER") | None => buildEmberServer[F](app) - case Some("BLAZE") => buildBlazeServer[F](app) - case Some("NETTY") => buildNettyServer[F](app) - case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other") - } - - private def buildEmberServer[F[_]: Async](app: HttpApp[F]) = { - implicit val network = Network.forAsync[F] - Resource.eval(Logger[F].info("Building ember server")) >> - EmberServerBuilder - .default[F] - .withHost(ipv4"0.0.0.0") - .withPort(port"8080") - .withHttpApp(app) - .withIdleTimeout(610.seconds) - .build - } - - private def buildBlazeServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] = - Resource.eval(Logger[F].info("Building blaze server")) >> - BlazeServerBuilder[F] - .bindSocketAddress(new InetSocketAddress(8080)) - .withHttpApp(app) - .withIdleTimeout(610.seconds) - .resource - - private def buildNettyServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] = - Resource.eval(Logger[F].info("Building netty server")) >> - NettyServerBuilder[F].bindLocal(8080).withHttpApp(app).withIdleTimeout(610.seconds).resource -} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala deleted file mode 100644 index ae89f23b9..000000000 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Config.scala +++ /dev/null @@ -1,304 +0,0 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream - -import java.nio.file.{Files, Path} - -import scala.concurrent.duration.FiniteDuration -import scala.collection.JavaConverters._ - -import com.typesafe.config.ConfigFactory - -import io.circe.config.syntax._ -import io.circe.generic.semiauto._ -import io.circe.Decoder -import io.circe._ - -import cats.implicits._ - -import cats.effect.Sync - -import org.http4s.SameSite - -import com.monovore.decline.{Command, Opts} - -object Config { - - def parse[F[_]: Sync]( - args: List[String] - ): F[Config] = { - val appConfig = Opts.option[Path]("config", "Path to HOCON configuration", "c", "config.hocon") - val parser = Command[Path]("collector", "Snowplow application that collects tracking events")(appConfig) - for { - path <- parser.parse(args) match { - case Left(help) => - Sync[F].raiseError(new IllegalArgumentException(s"can't read CLI arguments. $help")) - case Right(path) => - Sync[F].pure(path) - } - config <- parseConfigFile(path) - } yield config - } - - private def parseConfigFile[F[_]: Sync](path: Path): F[Config] = - Either.catchNonFatal(Files.readAllLines(path).asScala.mkString("\n")) match { - case Right(raw) => - val config = ConfigFactory - .parseString(raw) - .resolve() - .withFallback(ConfigFactory.load("application.conf")) - .withFallback(ConfigFactory.load("reference.conf")) - config.as[CollectorConfig] match { - case Right(parsed) => - Sync[F].pure(parsed.collector) - case Left(err) => - Sync[F].raiseError(new IllegalArgumentException(s"can't parse config file $path. Error: $err")) - } - case Left(err) => - Sync[F].raiseError(new IllegalArgumentException(s"can't read content of file $path. Error: $err")) - } - - final case class CollectorConfig( - collector: Config - ) - final case class P3P( - policyRef: String, - CP: String - ) - final case class CrossDomain( - enabled: Boolean, - domains: List[String], - secure: Boolean - ) - final case class Cookie( - enabled: Boolean, - name: String, - expiration: FiniteDuration, - domains: List[String], - fallbackDomain: Option[String], - secure: Boolean, - httpOnly: Boolean, - sameSite: Option[SameSite] - ) - final case class DoNotTrackCookie( - enabled: Boolean, - name: String, - value: String - ) - final case class CookieBounce( - enabled: Boolean, - name: String, - fallbackNetworkUserId: String, - forwardedProtocolHeader: Option[String] - ) - final case class RedirectMacro( - enabled: Boolean, - placeholder: Option[String] - ) - final case class RootResponse( - enabled: Boolean, - statusCode: Int, - headers: Map[String, String], - body: String - ) - final case class CORS( - accessControlMaxAge: FiniteDuration - ) - final case class Streams( - good: String, - bad: String, - useIpAddressAsPartitionKey: Boolean, - sink: Sink, - buffer: Buffer - ) - sealed trait Sink { - val maxBytes: Int - } - object Sink { - final case class Kinesis( - maxBytes: Int, - region: String, - threadPoolSize: Int, - aws: AWSCreds, - backoffPolicy: BackoffPolicy, - customEndpoint: Option[String], - sqsGoodBuffer: Option[String], - sqsBadBuffer: Option[String], - sqsMaxBytes: Int, - startupCheckInterval: FiniteDuration - ) extends Sink { - val endpoint = customEndpoint.getOrElse(region match { - case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn" - case cn @ "cn-northwest-1" => s"https://kinesis.$cn.amazonaws.com.cn" - case _ => s"https://kinesis.$region.amazonaws.com" - }) - } - final case class Sqs( - maxBytes: Int, - region: String, - threadPoolSize: Int, - aws: AWSCreds, - backoffPolicy: BackoffPolicy, - startupCheckInterval: FiniteDuration - ) extends Sink - final case class PubSub( - maxBytes: Int, - googleProjectId: String, - backoffPolicy: PubSubBackoffPolicy, - startupCheckInterval: FiniteDuration, - retryInterval: FiniteDuration - ) extends Sink - final case class Kafka( - maxBytes: Int, - brokers: String, - retries: Int, - producerConf: Option[Map[String, String]] - ) extends Sink - final case class Nsq( - maxBytes: Int, - host: String, - port: Int - ) extends Sink - final case class Rabbitmq( - maxBytes: Int, - username: String, - password: String, - virtualHost: String, - host: String, - port: Int, - backoffPolicy: RabbitMQBackoffPolicy, - routingKeyGood: String, - routingKeyBad: String, - threadPoolSize: Option[Int] - ) extends Sink - final case class Stdout( - maxBytes: Int - ) extends Sink - final case class AWSCreds( - accessKey: String, - secretKey: String - ) - final case class BackoffPolicy( - minBackoff: Long, - maxBackoff: Long, - maxRetries: Int - ) - final case class PubSubBackoffPolicy( - minBackoff: Long, - maxBackoff: Long, - totalBackoff: Long, - multiplier: Double, - initialRpcTimeout: Long, - maxRpcTimeout: Long, - rpcTimeoutMultiplier: Double - ) - final case class RabbitMQBackoffPolicy( - minBackoff: Long, - maxBackoff: Long, - multiplier: Double - ) - } - final case class Buffer( - byteLimit: Long, - recordLimit: Long, - timeLimit: Long - ) - final case class Monitoring( - metrics: Metrics - ) - final case class Metrics( - statsd: Statsd - ) - final case class Statsd( - enabled: Boolean, - hostname: String, - port: Int, - period: FiniteDuration, - prefix: String - ) - final case class SSL( - enable: Boolean, - redirect: Boolean, - port: Int - ) - implicit val p3pDecoder: Decoder[P3P] = deriveDecoder[P3P] - implicit val crossDomainDecoder: Decoder[CrossDomain] = deriveDecoder[CrossDomain] - implicit val sameSiteDecoder: Decoder[SameSite] = Decoder.instance { cur => - cur.as[String].map(_.toLowerCase) match { - case Right("none") => Right(SameSite.None) - case Right("strict") => Right(SameSite.Strict) - case Right("lax") => Right(SameSite.Lax) - case Right(other) => - Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history)) - case Left(err) => Left(err) - } - } - implicit val cookieDecoder: Decoder[Cookie] = deriveDecoder[Cookie] - implicit val doNotTrackCookieDecoder: Decoder[DoNotTrackCookie] = deriveDecoder[DoNotTrackCookie] - implicit val cookieBounceDecoder: Decoder[CookieBounce] = deriveDecoder[CookieBounce] - implicit val redirectMacroDecoder: Decoder[RedirectMacro] = deriveDecoder[RedirectMacro] - implicit val rootResponseDecoder: Decoder[RootResponse] = deriveDecoder[RootResponse] - implicit val corsDecoder: Decoder[CORS] = deriveDecoder[CORS] - implicit val bufferDecoder: Decoder[Buffer] = deriveDecoder[Buffer] - implicit val awsCredsDecoder: Decoder[Sink.AWSCreds] = deriveDecoder[Sink.AWSCreds] - implicit val backoffPolicyDecoder: Decoder[Sink.BackoffPolicy] = deriveDecoder[Sink.BackoffPolicy] - implicit val kinesisDecoder: Decoder[Sink.Kinesis] = deriveDecoder[Sink.Kinesis] - implicit val sqsDecoder: Decoder[Sink.Sqs] = deriveDecoder[Sink.Sqs] - implicit val pubSubBackoffPolicyDecoder: Decoder[Sink.PubSubBackoffPolicy] = deriveDecoder[Sink.PubSubBackoffPolicy] - implicit val pubSubDecoder: Decoder[Sink.PubSub] = deriveDecoder[Sink.PubSub] - implicit val kafkaDecoder: Decoder[Sink.Kafka] = deriveDecoder[Sink.Kafka] - implicit val nsqDecoder: Decoder[Sink.Nsq] = deriveDecoder[Sink.Nsq] - implicit val rabbitMQBackoffPolicyDecoder: Decoder[Sink.RabbitMQBackoffPolicy] = - deriveDecoder[Sink.RabbitMQBackoffPolicy] - implicit val rabbitMQDecoder: Decoder[Sink.Rabbitmq] = deriveDecoder[Sink.Rabbitmq] - implicit val stdoutDecoder: Decoder[Sink.Stdout] = deriveDecoder[Sink.Stdout] - implicit val sinkDecoder: Decoder[Sink] = Decoder.instance { cur => - val sinkType = cur.downField("enabled") - sinkType.as[String].map(_.toLowerCase) match { - case Right("kinesis") => - cur.as[Sink.Kinesis] - case Right("sqs") => - cur.as[Sink.Sqs] - case Right("google-pub-sub") => - cur.as[Sink.PubSub] - case Right("rabbitmq") => - cur.as[Sink.Rabbitmq] - case Right("nsq") => - cur.as[Sink.Nsq] - case Right("kafka") => - cur.as[Sink.Stdout] - case Right("stdout") => - cur.as[Sink.Stdout] - case Right(other) => - Left(DecodingFailure(s"Enabled sink type $other is not supported", sinkType.history)) - case Left(DecodingFailure(_, List(CursorOp.DownField("enabled")))) => - Left(DecodingFailure("Cannot find 'enabled' string in sink configuration", sinkType.history)) - case Left(other) => - Left(other) - } - } - implicit val streamsDecoder: Decoder[Streams] = deriveDecoder[Streams] - implicit val statsdDecoder: Decoder[Statsd] = deriveDecoder[Statsd] - implicit val metricsDecoder: Decoder[Metrics] = deriveDecoder[Metrics] - implicit val monitoringDecoder: Decoder[Monitoring] = deriveDecoder[Monitoring] - implicit val sslDecoder: Decoder[SSL] = deriveDecoder[SSL] - implicit val configDecoder: Decoder[Config] = deriveDecoder[Config] - implicit val collectorConfigDecoder: Decoder[CollectorConfig] = deriveDecoder[CollectorConfig] -} - -final case class Config( - interface: String, - port: Int, - paths: Map[String, String], - p3p: Config.P3P, - crossDomain: Config.CrossDomain, - cookie: Config.Cookie, - doNotTrackCookie: Config.DoNotTrackCookie, - cookieBounce: Config.CookieBounce, - redirectMacro: Config.RedirectMacro, - rootResponse: Config.RootResponse, - cors: Config.CORS, - streams: Config.Streams, - monitoring: Config.Monitoring, - ssl: Config.SSL, - enableDefaultRedirect: Boolean, - redirectDomains: Set[String] -) diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorRoutesSpec.scala similarity index 96% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorRoutesSpec.scala index 5d01f34fa..fe916188a 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorRoutesSpec.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import cats.effect.IO import cats.effect.unsafe.implicits.global diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorServiceSpec.scala similarity index 99% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorServiceSpec.scala index 58b64852b..1b09febb7 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorServiceSpec.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import scala.concurrent.duration._ import scala.collection.JavaConverters._ diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorTestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorTestUtils.scala similarity index 86% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorTestUtils.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorTestUtils.scala index e83091692..14e770969 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorTestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/CollectorTestUtils.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import cats.Applicative diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatchSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala similarity index 99% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatchSpec.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala index 84c412d06..1fdc18122 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatchSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/SplitBatchSpec.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import org.apache.thrift.TDeserializer diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestSink.scala similarity index 87% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestSink.scala index 2c273a603..d17aadc11 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestSink.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import cats.effect.IO diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala similarity index 96% rename from http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala rename to http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 9fd0e8869..c1430c959 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.core import scala.concurrent.duration._ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f0b442c06..9eab186b7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -100,7 +100,7 @@ object Dependencies { val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty - val decline = "com.monovore" %% "decline" % V.decline + val decline = "com.monovore" %% "decline-effect" % V.decline val circeGeneric = "io.circe" %% "circe-generic" % V.circe val circeConfig = "io.circe" %% "circe-config" % V.circeConfig diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala similarity index 90% rename from stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala rename to stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala index 661839903..effe73f97 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PrintingSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala @@ -1,4 +1,4 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream +package com.snowplowanalytics.snowplow.collector.stdout import java.io.PrintStream import java.util.Base64 diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala new file mode 100644 index 000000000..458c0ee8c --- /dev/null +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala @@ -0,0 +1,12 @@ +package com.snowplowanalytics.snowplow.collector.stdout + +import io.circe.Decoder +import io.circe.generic.semiauto._ + +final case class SinkConfig( + maxBytes: Int +) extends Config.Sink + +object SinkConfig { + implicit val configDecoder: Decoder[SinkConfig] = deriveDecoder[SinkConfig] +} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala new file mode 100644 index 000000000..0995a7d42 --- /dev/null +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -0,0 +1,15 @@ +package com.snowplowanalytics.snowplow.collector.stdout + +import cats.effect.{ExitCode, IO, IOApp, Sync} +import cats.effect.kernel.Resource + +import com.snowplowanalytics.snowplow.collector.core.model.Sinks + +object StdoutCollector extends CollectorApp[SinkConfig] { + + override def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]] = { + val good = new PrintingSink(maxBytes, System.out) + val bad = new PrintingSink(maxBytes, System.err) + Resource.pure(Sinks(good, bad)) + } +} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala deleted file mode 100644 index e97b8983f..000000000 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.snowplowanalytics.snowplow.collectors.scalastream - -import cats.effect.{ExitCode, IO, IOApp, Sync} -import cats.effect.kernel.Resource - -import com.snowplowanalytics.snowplow.collectors.scalastream.Config.Sink.Stdout -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ - -object StdoutCollector extends IOApp { - - def run(args: List[String]): IO[ExitCode] = - CollectorApp.run[IO]( - args, - BuildInfo.shortName, - BuildInfo.version, - mkSinks - ) - - def mkSinks[F[_]: Sync](config: Config.Streams): Resource[F, Sinks[F]] = - config.sink match { - case Stdout(maxBytes) => - val good = new PrintingSink(maxBytes, System.out) - val bad = new PrintingSink(maxBytes, System.err) - Resource.pure(Sinks(good, bad)) - case other => - Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"sink $other is not stdout"))) - } -}