diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala index fbdbce6e4..73196b59a 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala @@ -1,8 +1,9 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.implicits._ -import cats.effect.{ExitCode, IO} +import cats.effect.{Async, ExitCode, Sync} import cats.effect.kernel.Resource +import fs2.io.net.Network import com.comcast.ip4s.IpLiteralSyntax import org.http4s.server.Server import org.http4s.ember.server.EmberServerBuilder @@ -12,47 +13,63 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import java.net.InetSocketAddress -import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.{DurationLong, FiniteDuration} object CollectorApp { - implicit private def unsafeLogger: Logger[IO] = - Slf4jLogger.getLogger[IO] + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] - def run(): IO[ExitCode] = - buildHttpServer().use(_ => IO.never).as(ExitCode.Success) + def run[F[_]: Async](): F[ExitCode] = + withGracefulShutdown(610.seconds) { + buildHttpServer[F] + }.use(_ => Async[F].never[Unit]).as(ExitCode.Success) - private def buildHttpServer(): Resource[IO, Server] = + private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] = + for { + a <- resource + _ <- Resource.onFinalizeCase { + case Resource.ExitCase.Canceled => + Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >> + Async[F].sleep(delay) + case _ => + Async[F].unit + } + } yield a + + private def buildHttpServer[F[_]: Async]: Resource[F, Server] = sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match { - case Some("EMBER") | None => buildEmberServer - case Some("BLAZE") => buildBlazeServer - case Some("NETTY") => buildNettyServer + case Some("EMBER") | None => buildEmberServer[F] + case Some("BLAZE") => buildBlazeServer[F] + case Some("NETTY") => buildNettyServer[F] case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other") } - private def buildEmberServer = - Resource.eval(Logger[IO].info("Building ember server")) >> + private def buildEmberServer[F[_]: Async] = { + implicit val network = Network.forAsync[F] + Resource.eval(Logger[F].info("Building ember server")) >> EmberServerBuilder - .default[IO] + .default[F] .withHost(ipv4"0.0.0.0") .withPort(port"8080") - .withHttpApp(new CollectorRoutes[IO].value) + .withHttpApp(new CollectorRoutes[F].value) .withIdleTimeout(610.seconds) .build + } - private def buildBlazeServer: Resource[IO, Server] = - Resource.eval(Logger[IO].info("Building blaze server")) >> - BlazeServerBuilder[IO] + private def buildBlazeServer[F[_]: Async]: Resource[F, Server] = + Resource.eval(Logger[F].info("Building blaze server")) >> + BlazeServerBuilder[F] .bindSocketAddress(new InetSocketAddress(8080)) - .withHttpApp(new CollectorRoutes[IO].value) + .withHttpApp(new CollectorRoutes[F].value) .withIdleTimeout(610.seconds) .resource - private def buildNettyServer: Resource[IO, Server] = - Resource.eval(Logger[IO].info("Building netty server")) >> - NettyServerBuilder[IO] + private def buildNettyServer[F[_]: Async]: Resource[F, Server] = + Resource.eval(Logger[F].info("Building netty server")) >> + NettyServerBuilder[F] .bindLocal(8080) - .withHttpApp(new CollectorRoutes[IO].value) + .withHttpApp(new CollectorRoutes[F].value) .withIdleTimeout(610.seconds) .resource } diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index caeec05f5..848bf30d6 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -19,5 +19,5 @@ import cats.effect.{ExitCode, IO, IOApp} object StdoutCollector extends IOApp { def run(args: List[String]): IO[ExitCode] = - CollectorApp.run() + CollectorApp.run[IO] }