Skip to content

Commit

Permalink
Add pixel endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 11, 2023
1 parent b57573c commit ffc369c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.implicits._
import cats.effect.Sync
import org.typelevel.ci.CIString
import org.http4s.{HttpApp, HttpRoutes, Request}
import org.http4s.{HttpApp, HttpRoutes}
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import com.comcast.ip4s.Dns
Expand All @@ -19,51 +18,41 @@ class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDs

private val cookieRoutes = HttpRoutes.of[F] {
case req @ (POST | GET | HEAD) -> Root / vendor / version =>
val path = collectorService.determinePath(vendor, version)
val userAgent = extractHeader(req, "User-Agent")
val referer = extractHeader(req, "Referer")
val spAnonymous = extractHeader(req, "SP-Anonymous")
val hostname = req.remoteHost.map(_.map(_.toString))
val ip = req.remoteAddr.map(_.toUriString)

val path = collectorService.determinePath(vendor, version)
req.method match {
case POST =>
collectorService.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = hostname,
ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = false,
doNotTrack = false,
contentType = req.contentType.map(_.value.toLowerCase),
spAnonymous = spAnonymous
contentType = req.contentType.map(_.value.toLowerCase)
)
case GET | HEAD =>
collectorService.cookie(
queryString = Some(req.queryString),
body = Sync[F].delay(None),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = hostname,
ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = true,
doNotTrack = false,
contentType = None,
spAnonymous = spAnonymous
contentType = None
)
}

case req @ (GET | HEAD) -> Root / ("ice.png" | "i") =>
collectorService.cookie(
body = Sync[F].delay(None),
path = req.pathInfo.renderString,
cookie = None, //TODO: cookie will be added later
request = req,
pixelExpected = true,
doNotTrack = false,
contentType = None
)
}

val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound

def extractHeader(req: Request[F], headerName: String): Option[String] =
req.headers.get(CIString(headerName)).map(_.head.value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,21 @@ import org.http4s.Status._

import org.typelevel.ci._

import com.comcast.ip4s.Dns

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

import com.snowplowanalytics.snowplow.collectors.scalastream.model._

trait Service[F[_]] {
def cookie(
queryString: Option[String],
body: F[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: F[Option[String]],
ip: Option[String],
request: Request[F],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String] = None,
spAnonymous: Option[String] = None
contentType: Option[String] = None
): F[Response[F]]
def determinePath(vendor: String, version: String): String
}
Expand All @@ -54,6 +50,8 @@ class CollectorService[F[_]: Sync](
appVersion: String
) extends Service[F] {

implicit val dns: Dns[F] = Dns.forSync[F]

val pixelStream = Stream.iterable[F, Byte](CollectorService.pixel)

// TODO: Add sink type as well
Expand All @@ -62,23 +60,22 @@ class CollectorService[F[_]: Sync](
private val splitBatch: SplitBatch = SplitBatch(appName, appVersion)

def cookie(
queryString: Option[String],
body: F[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: F[Option[String]],
ip: Option[String],
request: Request[F],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String] = None,
spAnonymous: Option[String] = None
contentType: Option[String] = None
): F[Response[F]] =
for {
body <- body
hostname <- hostname
hostname <- request.remoteHost.map(_.map(_.toString))
userAgent = extractHeader(request, "User-Agent")
refererUri = extractHeader(request, "Referer")
spAnonymous = extractHeader(request, "SP-Anonymous")
ip = request.remoteAddr.map(_.toUriString)
queryString = Some(request.queryString)
// TODO: Get ipAsPartitionKey from config
(ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false)
nuid = UUID.randomUUID().toString // TODO: nuid should be set properly
Expand Down Expand Up @@ -116,6 +113,9 @@ class CollectorService[F[_]: Sync](
config.paths.getOrElse(original, original)
}

def extractHeader(req: Request[F], headerName: String): Option[String] =
req.headers.get(CIString(headerName)).map(_.head.value)

/** Builds a raw event from an Http request. */
def buildEvent(
queryString: Option[String],
Expand Down Expand Up @@ -163,8 +163,9 @@ class CollectorService[F[_]: Sync](
case false =>
Response[F](
status = Ok,
headers = headers
).withEntity("ok")
headers = headers,
body = Stream.emit("ok").through(fs2.text.utf8.encode)
)
}

// TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,23 @@ package com.snowplowanalytics.snowplow.collectors.scalastream
import scala.collection.mutable.ListBuffer
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import com.comcast.ip4s.SocketAddress
import org.http4s.implicits._
import org.http4s._
import org.http4s.headers._
import org.http4s.Status._
import fs2.{Stream, text}
import org.typelevel.ci._
import org.specs2.mutable.Specification

class CollectorRoutesSpec extends Specification {

case class CookieParams(
queryString: Option[String],
body: IO[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: IO[Option[String]],
ip: Option[String],
request: Request[IO],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String],
spAnonymous: Option[String]
contentType: Option[String]
)

class TestService() extends Service[IO] {
Expand All @@ -37,55 +29,30 @@ class CollectorRoutesSpec extends Specification {
def getCookieCalls: List[CookieParams] = cookieCalls.toList

override def cookie(
queryString: Option[String],
body: IO[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: IO[Option[String]],
ip: Option[String],
request: Request[IO],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String],
spAnonymous: Option[String]
contentType: Option[String]
): IO[Response[IO]] =
IO.delay {
cookieCalls += CookieParams(
queryString,
body,
path,
cookie,
userAgent,
refererUri,
hostname,
ip,
request,
pixelExpected,
doNotTrack,
contentType,
spAnonymous
contentType
)
Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode))
}

override def determinePath(vendor: String, version: String): String = "/p1/p2"
}

val testConnection = Request.Connection(
local = SocketAddress.fromStringIp("127.0.0.1:80").get,
remote = SocketAddress.fromStringIp("127.0.0.1:80").get,
secure = false
)

val testHeaders = Headers(
`User-Agent`(ProductId("testUserAgent")),
Referer(Uri.unsafeFromString("example.com")),
Header.Raw(ci"SP-Anonymous", "*"),
`Content-Type`(MediaType.application.json)
)

def createTestServices = {
val collectorService = new TestService()
val routes = new CollectorRoutes[IO](collectorService).value
Expand All @@ -105,60 +72,69 @@ class CollectorRoutesSpec extends Specification {
"respond to the post cookie route with the cookie response" in {
val (collectorService, routes) = createTestServices

val request = Request[IO](method = Method.POST, uri = uri"/p3/p4?a=b&c=d")
.withAttribute(Request.Keys.ConnectionInfo, testConnection)
val request = Request[IO](method = Method.POST, uri = uri"/p3/p4")
.withEntity("testBody")
.withHeaders(testHeaders)
.withHeaders(`Content-Type`(MediaType.application.json))
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.queryString shouldEqual Some("a=b&c=d")
cookieParams.body.unsafeRunSync() shouldEqual Some("testBody")
cookieParams.path shouldEqual "/p1/p2"
cookieParams.cookie shouldEqual None
cookieParams.userAgent shouldEqual Some("testUserAgent")
cookieParams.refererUri shouldEqual Some("example.com")
cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost")
cookieParams.ip shouldEqual Some("127.0.0.1")
cookieParams.pixelExpected shouldEqual false
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual Some("application/json")
cookieParams.spAnonymous shouldEqual Some("*")

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

"respond to the get or head cookie route with the cookie response" in {
def getHeadTest(method: Method) = {
def test(method: Method) = {
val (collectorService, routes) = createTestServices

val request = Request[IO](method = method, uri = uri"/p3/p4?a=b&c=d")
.withAttribute(Request.Keys.ConnectionInfo, testConnection)
.withEntity("testBody")
.withHeaders(testHeaders)
val request = Request[IO](method = method, uri = uri"/p3/p4").withEntity("testBody")
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.queryString shouldEqual Some("a=b&c=d")
cookieParams.body.unsafeRunSync() shouldEqual None
cookieParams.path shouldEqual "/p1/p2"
cookieParams.cookie shouldEqual None
cookieParams.userAgent shouldEqual Some("testUserAgent")
cookieParams.refererUri shouldEqual Some("example.com")
cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost")
cookieParams.ip shouldEqual Some("127.0.0.1")
cookieParams.pixelExpected shouldEqual true
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual None
cookieParams.spAnonymous shouldEqual Some("*")

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

getHeadTest(Method.GET)
getHeadTest(Method.HEAD)
test(Method.GET)
test(Method.HEAD)
}

"respond to the get or head pixel route with the cookie response" in {
def test(method: Method, uri: String) = {
val (collectorService, routes) = createTestServices

val request = Request[IO](method = method, uri = Uri.unsafeFromString(uri)).withEntity("testBody")
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.body.unsafeRunSync() shouldEqual None
cookieParams.path shouldEqual uri
cookieParams.cookie shouldEqual None
cookieParams.pixelExpected shouldEqual true
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual None

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

test(Method.GET, "/i")
test(Method.HEAD, "/i")
test(Method.GET, "/ice.png")
test(Method.HEAD, "/ice.png")
}
}

Expand Down
Loading

0 comments on commit ffc369c

Please sign in to comment.