Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get and head endpoints #328

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,48 @@ class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDs
}

private val cookieRoutes = HttpRoutes.of[F] {
case req @ POST -> Root / vendor / version =>
case req @ (POST | GET | HEAD) -> Root / vendor / version =>
val path = collectorService.determinePath(vendor, version)
val userAgent = extractHeader(req, "User-Agent")
val referer = extractHeader(req, "Referer")
val spAnonymous = extractHeader(req, "SP-Anonymous")
val hostname = req.remoteHost.map(_.map(_.toString))
val ip = req.remoteAddr.map(_.toUriString)

collectorService.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = req.remoteHost.map(_.map(_.toString)),
ip = req.remoteAddr.map(_.toUriString), // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = false,
doNotTrack = false,
contentType = req.contentType.map(_.value.toLowerCase),
spAnonymous = spAnonymous
)
req.method match {
case POST =>
collectorService.cookie(
queryString = Some(req.queryString),
body = req.bodyText.compile.string.map(Some(_)),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = hostname,
ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = false,
doNotTrack = false,
contentType = req.contentType.map(_.value.toLowerCase),
spAnonymous = spAnonymous
)
case GET | HEAD =>
collectorService.cookie(
queryString = Some(req.queryString),
body = Sync[F].delay(None),
path = path,
cookie = None, //TODO: cookie will be added later
userAgent = userAgent,
refererUri = referer,
hostname = hostname,
ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header
request = req,
pixelExpected = true,
doNotTrack = false,
contentType = None,
spAnonymous = spAnonymous
)
}
}

val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.UUID

import org.apache.commons.codec.binary.Base64

import scala.concurrent.duration._
import scala.collection.JavaConverters._

import cats.effect.{Clock, Sync}
import cats.implicits._

import fs2.Stream

import org.http4s._
import org.http4s.headers._
import org.http4s.implicits._
Expand Down Expand Up @@ -38,13 +42,20 @@ trait Service[F[_]] {
def determinePath(vendor: String, version: String): String
}

object CollectorService {
// Contains an invisible pixel to return for `/i` requests.
val pixel = Base64.decodeBase64("R0lGODlhAQABAPAAAP///wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==")
}

class CollectorService[F[_]: Sync](
config: CollectorConfig,
sinks: CollectorSinks[F],
appName: String,
appVersion: String
) extends Service[F] {

val pixelStream = Stream.iterable[F, Byte](CollectorService.pixel)

// TODO: Add sink type as well
private val collector = s"$appName-$appVersion"

Expand All @@ -70,8 +81,7 @@ class CollectorService[F[_]: Sync](
hostname <- hostname
// TODO: Get ipAsPartitionKey from config
(ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false)
// TODO: nuid should be set properly
nuid = UUID.randomUUID().toString
nuid = UUID.randomUUID().toString // TODO: nuid should be set properly
event = buildEvent(
queryString,
body,
Expand All @@ -93,9 +103,13 @@ class CollectorService[F[_]: Sync](
spAnonymous = spAnonymous,
now = now
)
responseHeaders = Headers(setCookie.toList.map(_.toRaw1))
headerList = List(
setCookie.map(_.toRaw1),
cacheControl(pixelExpected).map(_.toRaw1)
).flatten
responseHeaders = Headers(headerList)
_ <- sinkEvent(event, partitionKey)
} yield buildHttpResponse(responseHeaders)
} yield buildHttpResponse(responseHeaders, pixelExpected)

def determinePath(vendor: String, version: String): String = {
val original = s"/$vendor/$version"
Expand Down Expand Up @@ -135,8 +149,23 @@ class CollectorService[F[_]: Sync](
}

// TODO: Handle necessary cases to build http response in here
def buildHttpResponse(headers: Headers): Response[F] =
Response(status = Ok, headers = headers)
def buildHttpResponse(
headers: Headers,
pixelExpected: Boolean
): Response[F] =
pixelExpected match {
case true =>
Response[F](
headers = headers.put(`Content-Type`(MediaType.image.gif)),
body = pixelStream
)
// See https://github.com/snowplow/snowplow-javascript-tracker/issues/482
case false =>
Response[F](
status = Ok,
headers = headers
).withEntity("ok")
}

// TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers,
// they aren't included in here. It might be good to search for counterparts in Http4s.
Expand All @@ -153,6 +182,12 @@ class CollectorService[F[_]: Sync](
}
}

/** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */
def cacheControl(pixelExpected: Boolean): Option[`Cache-Control`] =
if (pixelExpected)
Some(`Cache-Control`(CacheDirective.`no-cache`(), CacheDirective.`no-store`, CacheDirective.`must-revalidate`))
spenes marked this conversation as resolved.
Show resolved Hide resolved
else None

/** Produces the event to the configured sink. */
def sinkEvent(
event: CollectorPayload,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import scala.collection.mutable.ListBuffer
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.http4s.implicits.http4sLiteralsSyntax
import org.http4s.{Method, Request, RequestCookie, Response, Status}
import com.comcast.ip4s.SocketAddress
import org.http4s.implicits._
import org.http4s._
import org.http4s.headers._
import org.http4s.Status._
import fs2.{Stream, text}
import org.typelevel.ci._
import org.specs2.mutable.Specification

class CollectorRoutesSpec extends Specification {

val collectorService = new Service[IO] {
case class CookieParams(
queryString: Option[String],
body: IO[Option[String]],
path: String,
cookie: Option[RequestCookie],
userAgent: Option[String],
refererUri: Option[String],
hostname: IO[Option[String]],
ip: Option[String],
request: Request[IO],
pixelExpected: Boolean,
doNotTrack: Boolean,
contentType: Option[String],
spAnonymous: Option[String]
)

class TestService() extends Service[IO] {

private val cookieCalls: ListBuffer[CookieParams] = ListBuffer()

def getCookieCalls: List[CookieParams] = cookieCalls.toList

override def cookie(
queryString: Option[String],
body: IO[Option[String]],
Expand All @@ -26,28 +51,115 @@ class CollectorRoutesSpec extends Specification {
contentType: Option[String],
spAnonymous: Option[String]
): IO[Response[IO]] =
IO.pure(Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode)))
IO.delay {
cookieCalls += CookieParams(
queryString,
body,
path,
cookie,
userAgent,
refererUri,
hostname,
ip,
request,
pixelExpected,
doNotTrack,
contentType,
spAnonymous
)
Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode))
}

override def determinePath(vendor: String, version: String): String = "/p1/p2"
}
val routes = new CollectorRoutes[IO](collectorService).value

val testConnection = Request.Connection(
local = SocketAddress.fromStringIp("127.0.0.1:80").get,
remote = SocketAddress.fromStringIp("127.0.0.1:80").get,
secure = false
)

val testHeaders = Headers(
`User-Agent`(ProductId("testUserAgent")),
Referer(Uri.unsafeFromString("example.com")),
Header.Raw(ci"SP-Anonymous", "*"),
`Content-Type`(MediaType.application.json)
)

def createTestServices = {
val collectorService = new TestService()
val routes = new CollectorRoutes[IO](collectorService).value
(collectorService, routes)
}

"The collector route" should {
"respond to the health route with an ok response" in {
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = routes.run(request).unsafeRunSync()
val (_, routes) = createTestServices
val request = Request[IO](method = Method.GET, uri = uri"/health")
val response = routes.run(request).unsafeRunSync()

response.status must beEqualTo(Status.Ok)
response.as[String].unsafeRunSync() must beEqualTo("OK")
}

"respond to the post cookie route with the cookie response" in {
val request = Request[IO](method = Method.POST, uri = uri"/p1/p2")
val (collectorService, routes) = createTestServices

val request = Request[IO](method = Method.POST, uri = uri"/p3/p4?a=b&c=d")
.withAttribute(Request.Keys.ConnectionInfo, testConnection)
.withEntity("testBody")
.withHeaders(testHeaders)
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.queryString shouldEqual Some("a=b&c=d")
cookieParams.body.unsafeRunSync() shouldEqual Some("testBody")
cookieParams.path shouldEqual "/p1/p2"
cookieParams.cookie shouldEqual None
cookieParams.userAgent shouldEqual Some("testUserAgent")
cookieParams.refererUri shouldEqual Some("example.com")
cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost")
cookieParams.ip shouldEqual Some("127.0.0.1")
cookieParams.pixelExpected shouldEqual false
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual Some("application/json")
cookieParams.spAnonymous shouldEqual Some("*")

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

"respond to the get or head cookie route with the cookie response" in {
def getHeadTest(method: Method) = {
val (collectorService, routes) = createTestServices

val request = Request[IO](method = method, uri = uri"/p3/p4?a=b&c=d")
.withAttribute(Request.Keys.ConnectionInfo, testConnection)
.withEntity("testBody")
.withHeaders(testHeaders)
val response = routes.run(request).unsafeRunSync()

val List(cookieParams) = collectorService.getCookieCalls
cookieParams.queryString shouldEqual Some("a=b&c=d")
cookieParams.body.unsafeRunSync() shouldEqual None
cookieParams.path shouldEqual "/p1/p2"
cookieParams.cookie shouldEqual None
cookieParams.userAgent shouldEqual Some("testUserAgent")
cookieParams.refererUri shouldEqual Some("example.com")
cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost")
cookieParams.ip shouldEqual Some("127.0.0.1")
cookieParams.pixelExpected shouldEqual true
cookieParams.doNotTrack shouldEqual false
cookieParams.contentType shouldEqual None
cookieParams.spAnonymous shouldEqual Some("*")

response.status must beEqualTo(Status.Ok)
response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie")
}

getHeadTest(Method.GET)
getHeadTest(Method.HEAD)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class CollectorServiceSpec extends Specification {
)
val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector")
val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r
val hs = Headers(
`X-Forwarded-For`(IpAddress.fromString("127.0.0.1")),
Cookie(RequestCookie("cookie", "value")),
`Access-Control-Allow-Credentials`()
)

def probeService(): ProbeService = {
val good = new TestSink
Expand Down Expand Up @@ -162,6 +167,30 @@ class CollectorServiceSpec extends Specification {
"image/gif"
).asJava
}

"return necessary cache control headers and respond with pixel when pixelExpected is true" in {
val r = service
.cookie(
queryString = Some("nuid=12"),
body = IO.pure(Some("b")),
path = "p",
cookie = None,
userAgent = None,
refererUri = None,
hostname = IO.pure(Some("h")),
ip = None,
request = Request[IO](),
pixelExpected = true,
doNotTrack = false,
contentType = None,
spAnonymous = Some("*")
)
.unsafeRunSync()
r.headers.get[`Cache-Control`] shouldEqual Some(
`Cache-Control`(CacheDirective.`no-cache`(), CacheDirective.`no-store`, CacheDirective.`must-revalidate`)
)
r.body.compile.toList.unsafeRunSync().toArray shouldEqual CollectorService.pixel
}
}

"buildEvent" in {
Expand Down Expand Up @@ -235,6 +264,20 @@ class CollectorServiceSpec extends Specification {
}
}

"buildHttpResponse" in {
"send back a gif if pixelExpected is true" in {
val res = service.buildHttpResponse(hs, pixelExpected = true)
res.status shouldEqual Status.Ok
res.headers shouldEqual hs.put(`Content-Type`(MediaType.image.gif))
res.body.compile.toList.unsafeRunSync().toArray shouldEqual CollectorService.pixel
}
"send back ok otherwise" in {
val res = service.buildHttpResponse(hs, pixelExpected = false)
res.status shouldEqual Status.Ok
res.bodyText.compile.toList.unsafeRunSync() shouldEqual List("ok")
}
}

"ipAndPartitionkey" in {
"give back the ip and partition key as ip if remote address is defined" in {
val address = Some("127.0.0.1")
Expand Down
Loading