Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Aug 14, 2023
1 parent 294f757 commit f06ab45
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 187 deletions.
15 changes: 11 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* governing permissions and limitations there under.
*/
import com.typesafe.sbt.packager.docker._
import sbtbuildinfo.BuildInfoPlugin.autoImport.buildInfoPackage
import sbtbuildinfo.BuildInfoPlugin.autoImport._

lazy val commonDependencies = Seq(
// Java
Expand Down Expand Up @@ -90,7 +90,7 @@ lazy val buildSettings = Seq(
name := "snowplow-stream-collector",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.12.10",
scalacOptions ++= Seq("-Ypartial-unification"),
scalacOptions ++= Seq("-Ypartial-unification", "-Ywarn-macros:after"),
javacOptions := Seq("-source", "11", "-target", "11"),
resolvers ++= Dependencies.resolutionRepos
)
Expand All @@ -100,6 +100,11 @@ lazy val dynVerSettings = Seq(
ThisBuild / dynverSeparator := "-" // to be compatible with docker
)

lazy val http4sBuildInfoSettings = Seq(
buildInfoKeys := Seq[BuildInfoKey](name, dockerAlias, version),
buildInfoOptions += BuildInfoOption.Traits("com.snowplowanalytics.snowplow.collector.core.AppInfo")
)

lazy val allSettings = buildSettings ++
BuildSettings.sbtAssemblySettings ++
BuildSettings.formatting ++
Expand Down Expand Up @@ -265,13 +270,15 @@ lazy val nsqDistroless = project
.dependsOn(core % "test->test;compile->compile")

lazy val stdoutSettings =
allSettings ++ buildInfoSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
moduleName := "snowplow-stream-collector-stdout",
Docker / packageName := "scala-stream-collector-stdout"
Docker / packageName := "scala-stream-collector-stdout",

)

lazy val stdout = project
.settings(stdoutSettings)
.settings(buildInfoPackage := s"com.snowplowanalytics.snowplow.collector.stdout")
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(http4s % "test->test;compile->compile")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ import io.circe.Decoder

import com.snowplowanalytics.snowplow.collector.core.model.Sinks

abstract class App[SinkConfig: Decoder]
extends CommandIOApp(name = "collector", header = "header", version = "version") {
abstract class App[SinkConfig: Decoder](appInfo: AppInfo)
extends CommandIOApp(
name = App.helpCommand(appInfo),
header = "Snowplow application that collects tracking events",
version = appInfo.version
) {

def mkSinks[F[_]: Sync](config: Config.Streams[SinkConfig]): Resource[F, Sinks[F]]

final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](mkSinks)
final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks)
}

object App {
private def helpCommand(appInfo: AppInfo) = s"docker run ${appInfo.dockerAlias}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ object Config {
buffer: Buffer
)

//sealed trait Sinks {
// val maxBytes: Int
//}
trait Sink {
val maxBytes: Int
}

case class Buffer(
byteLimit: Long,
Expand Down Expand Up @@ -121,32 +121,31 @@ object Config {
port: Int
)

implicit val p3p = deriveDecoder[P3P]
implicit val crossDomain = deriveDecoder[CrossDomain]
implicit val sameSite: Decoder[SameSite] = Decoder.instance { cur =>
cur.as[String].map(_.toLowerCase) match {
case Right("none") => Right(SameSite.None)
case Right("strict") => Right(SameSite.Strict)
case Right("lax") => Right(SameSite.Lax)
case Right(other) =>
Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history))
case Left(err) => Left(err)
}
}
implicit val cookie = deriveDecoder[Cookie]
implicit val doNotTrackCookie = deriveDecoder[DoNotTrackCookie]
implicit val cookieBounce = deriveDecoder[CookieBounce]
implicit val redirectMacro = deriveDecoder[RedirectMacro]
implicit val rootResponse = deriveDecoder[RootResponse]
implicit val cors = deriveDecoder[CORS]
implicit val buffer = deriveDecoder[Buffer]
implicit val statsd = deriveDecoder[Statsd]
implicit val metrics = deriveDecoder[Metrics]
implicit val monitoring = deriveDecoder[Monitoring]
implicit val ssl = deriveDecoder[SSL]

implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = {
implicit val _ = deriveDecoder[Streams[SinkConfig]]
implicit val p3p = deriveDecoder[P3P]
implicit val crossDomain = deriveDecoder[CrossDomain]
implicit val sameSite: Decoder[SameSite] = Decoder.instance { cur =>
cur.as[String].map(_.toLowerCase) match {
case Right("none") => Right(SameSite.None)
case Right("strict") => Right(SameSite.Strict)
case Right("lax") => Right(SameSite.Lax)
case Right(other) =>
Left(DecodingFailure(s"sameSite $other is not supported. Accepted values: None, Strict, Lax", cur.history))
case Left(err) => Left(err)
}
}
implicit val cookie = deriveDecoder[Cookie]
implicit val doNotTrackCookie = deriveDecoder[DoNotTrackCookie]
implicit val cookieBounce = deriveDecoder[CookieBounce]
implicit val redirectMacro = deriveDecoder[RedirectMacro]
implicit val rootResponse = deriveDecoder[RootResponse]
implicit val cors = deriveDecoder[CORS]
implicit val buffer = deriveDecoder[Buffer]
implicit val streams = deriveDecoder[Streams[SinkConfig]]
implicit val statsd = deriveDecoder[Statsd]
implicit val metrics = deriveDecoder[Metrics]
implicit val monitoring = deriveDecoder[Monitoring]
implicit val ssl = deriveDecoder[SSL]
deriveDecoder[Config[SinkConfig]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import com.comcast.ip4s.Dns

class Routes[F[_]: Sync, SinkConfig](service: Service[F, SinkConfig]) extends Http4sDsl[F] {
class Routes[F[_]: Sync](service: IService[F]) extends Http4sDsl[F] {

implicit val dns: Dns[F] = Dns.forSync[F]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ import java.nio.file.Path
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

//import scala.concurrent.duration.{DurationLong, FiniteDuration}
//import scala.collection.JavaConverters._
import scala.concurrent.duration.{DurationLong, FiniteDuration}

import cats.implicits._
import cats.data.EitherT

import cats.effect.{ExitCode, Sync}
import cats.effect.{Async, ExitCode, Sync}
import cats.effect.kernel.Resource

//import com.monovore.decline.effect.CommandIOApp
import com.monovore.decline.Opts

import io.circe.Decoder
Expand All @@ -25,20 +23,22 @@ object Run {

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def fromCli[F[_]: Sync, SinkConfig: Decoder](
def fromCli[F[_]: Async, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]]
): Opts[F[ExitCode]] = {
val configPathOpt = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon") //.orNone
configPathOpt.map(fromPath[F, SinkConfig](mkSinks, _))
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon") //.orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, _))
}

private def fromPath[F[_]: Sync, SinkConfig: Decoder](
private def fromPath[F[_]: Async, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
path: Path
): F[ExitCode] = {
val eitherT = for {
config <- ConfigParser.fromPath[F, SinkConfig](path)
_ <- EitherT.right[ExitCode](fromConfig(mkSinks, config))
_ <- EitherT.right[ExitCode](fromConfig(appInfo, mkSinks, config))
} yield ExitCode.Success

eitherT.merge.handleErrorWith { e =>
Expand All @@ -47,27 +47,30 @@ object Run {
}
}

private def fromConfig[F[_]: Sync, SinkConfig](
private def fromConfig[F[_]: Async, SinkConfig](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
config: Config[SinkConfig]
): F[ExitCode] = ???
//private def fromConfig[F[_]: Async, SourceConfig, SinkConfig](
// appInfo: AppInfo,
// toSource: SourceConfig => SourceAndAck[F],
// toBadSink: SinkConfig => Resource[F, Sink[F]],
// config: Config.WithIglu[SourceConfig, SinkConfig]
//): F[ExitCode] =
// Environment.fromConfig(config, appInfo, toSource, toBadSink).use { env =>
// Processing
// .stream(env)
// .concurrently(Telemetry.stream(config.main.telemetry, env))
// .concurrently(env.metrics.report)
// .compile
// .drain
// .as(ExitCode.Success)
// }

def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = {
): F[ExitCode] = {
val resources = for {
sinks <- mkSinks(config.streams)
collectorService = new Service[F, SinkConfig](
config,
Sinks(sinks.good, sinks.bad),
appInfo
)
httpServer = HttpServer.build[F](
new Routes[F](collectorService).value,
config.interface,
config.port
)
_ <- withGracefulShutdown(610.seconds)(httpServer)
} yield ()

resources.surround(Async[F].never[ExitCode])
}

private def prettyLogException[F[_]: Sync](e: Throwable): F[Unit] = {

def logCause(e: Throwable): F[Unit] =
Option(e.getCause) match {
Expand All @@ -77,40 +80,16 @@ object Run {

Logger[F].error(e.getMessage) >> logCause(e)
}
}

// def run[F[_]: Async](
// args: List[String],
// appName: String,
// appVersion: String,
// mkSinks: Config.Streams => Resource[F, Sinks[F]]
// ): F[ExitCode] = {
// val resources = for {
// config <- Resource.eval(Config.parse(args))
// sinks <- mkSinks(config.streams)
// _ <- withGracefulShutdown(610.seconds) {
// val collectorService: CollectorService[F] = new CollectorService[F](
// config,
// Sinks(sinks.good, sinks.bad),
// appName,
// appVersion
// )
// buildHttpServer[F](new CollectorRoutes[F](collectorService).value)
// }
// } yield ()
//
// resources.surround(Async[F].never[ExitCode])
// }
//
// private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] =
// for {
// a <- resource
// _ <- Resource.onFinalizeCase {
// case Resource.ExitCase.Canceled =>
// Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >>
// Async[F].sleep(delay)
// case _ =>
// Async[F].unit
// }
// } yield a
//
private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] =
for {
a <- resource
_ <- Resource.onFinalizeCase {
case Resource.ExitCase.Canceled =>
Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >>
Async[F].sleep(delay)
case _ =>
Async[F].unit
}
} yield a
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ trait IService[F[_]] {
class Service[F[_]: Sync, SinkConfig](
config: Config[SinkConfig],
sinks: Sinks[F],
appName: String,
appVersion: String
appInfo: AppInfo
) extends IService[F] {

// TODO: Add sink type as well
private val collector = s"$appName-$appVersion"
private val collector = s"${appInfo.name}:${appInfo.version}"

private val splitBatch: SplitBatch = SplitBatch(appName, appVersion)
private val splitBatch: SplitBatch = SplitBatch(appInfo)

def cookie(
queryString: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPa
import com.snowplowanalytics.snowplow.collector.core.model._

/** Object handling splitting an array of strings correctly */
case class SplitBatch(appName: String, appVersion: String) {
case class SplitBatch(appInfo: AppInfo) {

// Serialize Thrift CollectorPayload objects
val ThriftSerializer = new ThreadLocal[TSerializer] {
Expand Down Expand Up @@ -124,7 +124,7 @@ case class SplitBatch(appName: String, appVersion: String) {
): Array[Byte] =
BadRow
.SizeViolation(
Processor(appName, appVersion),
Processor(appInfo.name, appInfo.version),
Failure.SizeViolation(Instant.now(), maxSize, size, s"oversized collector payload: $msg"),
Payload.RawPayload(event.toString().take(maxSize / 10))
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.snowplowanalytics.snowplow.collector.core

import org.specs2.mutable.Specification

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import org.http4s.implicits.http4sLiteralsSyntax
import org.http4s.{Method, Request, RequestCookie, Response, Status}
import org.http4s.Status._

import fs2.{Stream, text}
import org.specs2.mutable.Specification

class CollectorRoutesSpec extends Specification {
class RoutesSpec extends Specification {

val collectorService = new Service[IO] {
val service = new IService[IO] {
override def cookie(
queryString: Option[String],
body: IO[Option[String]],
Expand All @@ -30,7 +33,7 @@ class CollectorRoutesSpec extends Specification {

override def determinePath(vendor: String, version: String): String = "/p1/p2"
}
val routes = new CollectorRoutes[IO](collectorService).value
val routes = new Routes(service).value

"The collector route" should {
"respond to the health route with an ok response" in {
Expand Down
Loading

0 comments on commit f06ab45

Please sign in to comment.