Skip to content

Commit

Permalink
Add post endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 7, 2023
1 parent 21ec804 commit a381e44
Show file tree
Hide file tree
Showing 12 changed files with 896 additions and 14 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ lazy val http4s = project
Dependencies.Libraries.http4sBlaze,
Dependencies.Libraries.http4sNetty,
Dependencies.Libraries.log4cats,
Dependencies.Libraries.thrift,
Dependencies.Libraries.badRows,
Dependencies.Libraries.collectorPayload,
Dependencies.Libraries.slf4j,
Dependencies.Libraries.specs2
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import java.net.InetSocketAddress
import scala.concurrent.duration.{DurationLong, FiniteDuration}

import com.snowplowanalytics.snowplow.collectors.scalastream.model._

object CollectorApp {

implicit private def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

def run[F[_]: Async](mkGood: Resource[F, Sink[F]], mkBad: Resource[F, Sink[F]]): F[ExitCode] = {
def run[F[_]: Async](
mkGood: Resource[F, Sink[F]],
mkBad: Resource[F, Sink[F]],
config: CollectorConfig,
appName: String,
appVersion: String
): F[ExitCode] = {
val resources = for {
bad <- mkBad
good <- mkGood
_ <- withGracefulShutdown(610.seconds) {
buildHttpServer[F](new CollectorRoutes[F](good, bad).value)
val sinks = CollectorSinks(good, bad)
val collectorService: CollectorService[F] = new CollectorService[F](config, sinks, appName, appVersion)
buildHttpServer[F](new CollectorRoutes[F](collectorService).value)
}
} yield ()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,50 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.implicits._
import cats.effect.Sync
import org.http4s.{HttpApp, HttpRoutes}
import org.typelevel.ci.CIString
import org.http4s.{HttpApp, HttpRoutes, Request}
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import com.comcast.ip4s.Dns

class CollectorRoutes[F[_]: Sync](good: Sink[F], bad: Sink[F]) extends Http4sDsl[F] {
class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDsl[F] {

val _ = (good, bad)
implicit val dns: Dns[F] = Dns.forSync[F]

lazy val value: HttpApp[F] = HttpRoutes
private val healthRoutes = HttpRoutes
.of[F] {
case GET -> Root / "health" =>
Ok("OK")
}
.orNotFound

private val cookieRoutes = HttpRoutes
.of[F] {
case req@POST -> Root / vendor / version =>
val path = collectorService.determinePath(vendor, version)
val userAgent = extractHeader(req, "User-Agent")
val referer = extractHeader(req, "Referer")
val spAnonymous = extractHeader(req, "SP-Anonymous")

collectorService.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = req.remoteHost.map(_.map(_.toString)),
ip = req.remoteAddr.map(_.toUriString), // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = false,
doNotTrack = false,
contentType = req.contentType.map(_.value.toLowerCase),
spAnonymous = spAnonymous
)
}

val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound

def extractHeader(req:Request[F], headerName: String): Option[String] =
req.headers.get(CIString(headerName)).map(_.head.value)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.UUID

import scala.collection.JavaConverters._

import cats.effect.Sync
import cats.implicits._

import org.http4s.{Request, Response, RequestCookie}
import org.http4s.Status._

import org.typelevel.ci._

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

import com.snowplowanalytics.snowplow.collectors.scalastream.model._

trait Service[F[_]] {
def cookie(
queryString: Option[String],
body: F[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: F[Option[String]],
ip: Option[String],
request: Request[F],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String] = None,
spAnonymous: Option[String] = None
): F[Response[F]]
def determinePath(vendor: String, version: String): String
}

class CollectorService[F[_]: Sync](
config: CollectorConfig,
sinks: CollectorSinks[F],
appName: String,
appVersion: String
) extends Service[F] {

// TODO: Add sink type as well
private val collector = s"$appName-$appVersion"

private val splitBatch: SplitBatch = SplitBatch(appName, appVersion)

def cookie(
queryString: Option[String],
body: F[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: F[Option[String]],
ip: Option[String],
request: Request[F],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String] = None,
spAnonymous: Option[String] = None
): F[Response[F]] =
for {
body <- body
hostname <- hostname
// TODO: Get ipAsPartitionKey from config
(ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false)
// TODO: nuid should be set properly
nuid = UUID.randomUUID().toString
event = buildEvent(
queryString,
body,
path,
userAgent,
refererUri,
hostname,
ipAddress,
nuid,
contentType,
headers(request, spAnonymous)
)
_ <- sinkEvent(event, partitionKey)
} yield buildHttpResponse

def determinePath(vendor: String, version: String): String = {
val original = s"/$vendor/$version"
config.paths.getOrElse(original, original)
}

/** Builds a raw event from an Http request. */
def buildEvent(
queryString: Option[String],
body: Option[String],
path: String,
userAgent: Option[String],
refererUri: Option[String],
hostname: Option[String],
ipAddress: String,
networkUserId: String,
contentType: Option[String],
headers: List[String]
): CollectorPayload = {
val e = new CollectorPayload(
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0",
ipAddress,
System.currentTimeMillis,
"UTF-8",
collector
)
queryString.foreach(e.querystring = _)
body.foreach(e.body = _)
e.path = path
userAgent.foreach(e.userAgent = _)
refererUri.foreach(e.refererUri = _)
hostname.foreach(e.hostname = _)
e.networkUserId = networkUserId
e.headers = (headers ++ contentType).asJava
contentType.foreach(e.contentType = _)
e
}

// TODO: Handle necessary cases to build http response in here
def buildHttpResponse: Response[F] = Response(status = Ok)

// TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers,
// they aren't included in here. It might be good to search for counterparts in Http4s.
/** If the SP-Anonymous header is not present, retrieves all headers
* from the request.
* If the SP-Anonymous header is present, additionally filters out the
* X-Forwarded-For, X-Real-IP and Cookie headers as well.
*/
def headers(request: Request[F], spAnonymous: Option[String]): List[String] =
request.headers.headers.flatMap { h =>
h.name match {
case ci"X-Forwarded-For" | ci"X-Real-Ip" | ci"Cookie" if spAnonymous.isDefined => None
case _ => Some(h.toString())
}
}

/** Produces the event to the configured sink. */
def sinkEvent(
event: CollectorPayload,
partitionKey: String
): F[Unit] =
for {
// Split events into Good and Bad
eventSplit <- Sync[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes))
// Send events to respective sinks
_ <- sinks.good.storeRawEvents(eventSplit.good, partitionKey)
_ <- sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
} yield ()

/**
* Gets the IP from a RemoteAddress. If ipAsPartitionKey is false, a UUID will be generated.
*
* @param remoteAddress Address extracted from an HTTP request
* @param ipAsPartitionKey Whether to use the ip as a partition key or a random UUID
* @return a tuple of ip (unknown if it couldn't be extracted) and partition key
*/
def ipAndPartitionKey(
ipAddress: Option[String],
ipAsPartitionKey: Boolean
): (String, String) =
ipAddress match {
case None => ("unknown", UUID.randomUUID.toString)
case Some(ip) => (ip, if (ipAsPartitionKey) ip else UUID.randomUUID.toString)
}
}
Loading

0 comments on commit a381e44

Please sign in to comment.