diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala index fbdbce6e4..3f0910287 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala @@ -1,9 +1,11 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.implicits._ -import cats.effect.{ExitCode, IO} +import cats.effect.{Async, ExitCode, Sync} import cats.effect.kernel.Resource +import fs2.io.net.Network import com.comcast.ip4s.IpLiteralSyntax +import org.http4s.HttpApp import org.http4s.server.Server import org.http4s.ember.server.EmberServerBuilder import org.http4s.blaze.server.BlazeServerBuilder @@ -12,47 +14,70 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import java.net.InetSocketAddress -import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.{DurationLong, FiniteDuration} object CollectorApp { - implicit private def unsafeLogger: Logger[IO] = - Slf4jLogger.getLogger[IO] + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] - def run(): IO[ExitCode] = - buildHttpServer().use(_ => IO.never).as(ExitCode.Success) + def run[F[_]: Async](mkGood: Resource[F, Sink[F]], mkBad: Resource[F, Sink[F]]): F[ExitCode] = { + val resources = for { + bad <- mkBad + good <- mkGood + _ <- withGracefulShutdown(610.seconds) { + buildHttpServer[F](new CollectorRoutes[F](good, bad).value) + } + } yield () - private def buildHttpServer(): Resource[IO, Server] = + resources.surround(Async[F].never[ExitCode]) + } + + private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] = + for { + a <- resource + _ <- Resource.onFinalizeCase { + case Resource.ExitCase.Canceled => + Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >> + Async[F].sleep(delay) + case _ => + Async[F].unit + } + } yield a + + private def buildHttpServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] = sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match { - case Some("EMBER") | None => buildEmberServer - case Some("BLAZE") => buildBlazeServer - case Some("NETTY") => buildNettyServer + case Some("EMBER") | None => buildEmberServer[F](app) + case Some("BLAZE") => buildBlazeServer[F](app) + case Some("NETTY") => buildNettyServer[F](app) case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other") } - private def buildEmberServer = - Resource.eval(Logger[IO].info("Building ember server")) >> + private def buildEmberServer[F[_]: Async](app: HttpApp[F]) = { + implicit val network = Network.forAsync[F] + Resource.eval(Logger[F].info("Building ember server")) >> EmberServerBuilder - .default[IO] + .default[F] .withHost(ipv4"0.0.0.0") .withPort(port"8080") - .withHttpApp(new CollectorRoutes[IO].value) + .withHttpApp(app) .withIdleTimeout(610.seconds) .build + } - private def buildBlazeServer: Resource[IO, Server] = - Resource.eval(Logger[IO].info("Building blaze server")) >> - BlazeServerBuilder[IO] + private def buildBlazeServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] = + Resource.eval(Logger[F].info("Building blaze server")) >> + BlazeServerBuilder[F] .bindSocketAddress(new InetSocketAddress(8080)) - .withHttpApp(new CollectorRoutes[IO].value) + .withHttpApp(app) .withIdleTimeout(610.seconds) .resource - private def buildNettyServer: Resource[IO, Server] = - Resource.eval(Logger[IO].info("Building netty server")) >> - NettyServerBuilder[IO] + private def buildNettyServer[F[_]: Async](app: HttpApp[F]): Resource[F, Server] = + Resource.eval(Logger[F].info("Building netty server")) >> + NettyServerBuilder[F] .bindLocal(8080) - .withHttpApp(new CollectorRoutes[IO].value) + .withHttpApp(app) .withIdleTimeout(610.seconds) .resource } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala index 407e3af70..3413e161e 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala @@ -4,7 +4,9 @@ import cats.effect.Sync import org.http4s.{HttpApp, HttpRoutes} import org.http4s.dsl.Http4sDsl -class CollectorRoutes[F[_]: Sync]() extends Http4sDsl[F] { +class CollectorRoutes[F[_]: Sync](good: Sink[F], bad: Sink[F]) extends Http4sDsl[F] { + + val _ = (good, bad) lazy val value: HttpApp[F] = HttpRoutes .of[F] { diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index caeec05f5..62cc51ac7 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -14,10 +14,32 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import cats.effect.{ExitCode, IO, IOApp} +import cats.effect.{ExitCode, IO, IOApp, Sync} +import cats.effect.kernel.Resource +import cats.implicits._ + +import java.util.Base64 +import java.io.PrintStream object StdoutCollector extends IOApp { - def run(args: List[String]): IO[ExitCode] = - CollectorApp.run() + def run(args: List[String]): IO[ExitCode] = { + val good = Resource.pure[IO, Sink[IO]](printingSink(System.out)) + val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err)) + CollectorApp.run[IO](good, bad) + } + + private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] { + val maxBytes = Int.MaxValue // TODO: configurable? + def isHealthy: F[Boolean] = Sync[F].pure(true) + + val encoder = Base64.getEncoder().withoutPadding() + + def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + events.traverse_ { e => + Sync[F].delay { + stream.println(encoder.encodeToString(e)) + } + } + } }