Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http4s collector scaffold #318

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ lazy val buildSettings = Seq(
name := "snowplow-stream-collector",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.12.10",
scalacOptions ++= Seq("-Ypartial-unification"),
javacOptions := Seq("-source", "11", "-target", "11"),
resolvers ++= Dependencies.resolutionRepos
)
Expand All @@ -109,7 +110,7 @@ lazy val allSettings = buildSettings ++
lazy val root = project
.in(file("."))
.settings(buildSettings ++ dynVerSettings)
.aggregate(core, kinesis, pubsub, kafka, nsq, stdout, sqs, rabbitmq)
.aggregate(core, kinesis, pubsub, kafka, nsq, stdout, sqs, rabbitmq, http4s)

lazy val core = project
.settings(moduleName := "snowplow-stream-collector-core")
Expand All @@ -119,6 +120,17 @@ lazy val core = project
.settings(Defaults.itSettings)
.configs(IntegrationTest)

lazy val http4s = project
.settings(moduleName := "snowplow-stream-collector-http4s-core")
.settings(buildSettings ++ BuildSettings.sbtAssemblySettings)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.http4sDsl,
Dependencies.Libraries.http4sServer,
Dependencies.Libraries.specs2
)
)

lazy val kinesisSettings =
allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
moduleName := "snowplow-stream-collector-kinesis",
Expand Down Expand Up @@ -251,14 +263,14 @@ lazy val stdoutSettings =
lazy val stdout = project
.settings(stdoutSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val stdoutDistroless = project
.in(file("distroless/stdout"))
.settings(sourceDirectory := (stdout / sourceDirectory).value)
.settings(stdoutSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile")

lazy val rabbitmqSettings =
allSettings ++ buildInfoSettings ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.IpLiteralSyntax
import org.http4s.ember.server.EmberServerBuilder

object CollectorApp {

def run(): IO[ExitCode] =
buildHttpServer().use(_ => IO.never).as(ExitCode.Success)

private def buildHttpServer() =
EmberServerBuilder
.default[IO]
.withHost(ipv4"0.0.0.0")
.withPort(port"8080")
.withHttpApp(new CollectorRoutes[IO].value)
.build
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.Sync
import org.http4s.{HttpApp, HttpRoutes}
import org.http4s.dsl.Http4sDsl

class CollectorRoutes[F[_]: Sync]() extends Http4sDsl[F] {

lazy val value: HttpApp[F] = HttpRoutes
.of[F] {
case GET -> Root / "health" =>
Ok("OK")
}
.orNotFound
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.http4s.implicits.http4sLiteralsSyntax
import org.http4s.{Method, Request, Status}
import org.specs2.mutable.Specification

class CollectorRoutesSpec extends Specification {

"Health endpoint" should {
"return OK always because collector always works" in {
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = new CollectorRoutes[IO].value.run(request).unsafeRunSync()

response.status must beEqualTo(Status.Ok)
response.as[String].unsafeRunSync() must beEqualTo("OK")
}
}

}
10 changes: 8 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ object Dependencies {
val specs2CE = "0.4.1"
val testcontainers = "0.40.10"
val catsRetry = "2.1.0"
val http4s = "0.21.33"
val http4s = "0.23.23"
val http4sIT = "0.21.33"
}

object Libraries {
Expand Down Expand Up @@ -87,13 +88,18 @@ object Dependencies {
val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig
val akkaHttpMetrics = "fr.davit" %% "akka-http-metrics-datadog" % V.akkaHttpMetrics


//http4s
val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s
val http4sServer = "org.http4s" %% "http4s-ember-server" % V.http4s

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest
val specs2CEIt = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest
val testcontainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest
val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest
val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4s % IntegrationTest
val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4sIT % IntegrationTest
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test
val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test
val akkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % V.akka % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,10 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.StdoutSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService
import cats.effect.{ExitCode, IO, IOApp}

object StdoutCollector extends Collector {
object StdoutCollector extends IOApp {

def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks = {
val (good, bad) = collectorConf.streams.sink match {
case s: Stdout => (new StdoutSink(s.maxBytes, "out"), new StdoutSink(s.maxBytes, "err"))
case _ => throw new IllegalArgumentException("Configured sink is not stdout")
}
CollectorSinks(good, bad)
}
run(collectorConf, akkaConf, sinks, telemetry)
}
def run(args: List[String]): IO[ExitCode] =
CollectorApp.run()
}

This file was deleted.

This file was deleted.

Loading