diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala index 01870a242..539a3920f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala @@ -18,27 +18,48 @@ class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDs } private val cookieRoutes = HttpRoutes.of[F] { - case req @ POST -> Root / vendor / version => + case req @ (POST | GET | HEAD) -> Root / vendor / version => val path = collectorService.determinePath(vendor, version) val userAgent = extractHeader(req, "User-Agent") val referer = extractHeader(req, "Referer") val spAnonymous = extractHeader(req, "SP-Anonymous") + val hostname = req.remoteHost.map(_.map(_.toString)) + val ip = req.remoteAddr.map(_.toUriString) - collectorService.cookie( - queryString = Some(req.queryString), - body = req.bodyText.compile.string.map(Some(_)), - path = path, - cookie = None, //TODO: cookie will be added later - userAgent = userAgent, - refererUri = referer, - hostname = req.remoteHost.map(_.map(_.toString)), - ip = req.remoteAddr.map(_.toUriString), // TODO: Do not set the ip if request contains SP-Anonymous header - request = req, - pixelExpected = false, - doNotTrack = false, - contentType = req.contentType.map(_.value.toLowerCase), - spAnonymous = spAnonymous - ) + req.method match { + case POST => + collectorService.cookie( + queryString = Some(req.queryString), + body = req.bodyText.compile.string.map(Some(_)), + path = path, + cookie = None, //TODO: cookie will be added later + userAgent = userAgent, + refererUri = referer, + hostname = hostname, + ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header + request = req, + pixelExpected = false, + doNotTrack = false, + contentType = req.contentType.map(_.value.toLowerCase), + spAnonymous = spAnonymous + ) + case GET | HEAD => + collectorService.cookie( + queryString = Some(req.queryString), + body = Sync[F].delay(None), + path = path, + cookie = None, //TODO: cookie will be added later + userAgent = userAgent, + refererUri = referer, + hostname = hostname, + ip = ip, // TODO: Do not set the ip if request contains SP-Anonymous header + request = req, + pixelExpected = true, + doNotTrack = false, + contentType = None, + spAnonymous = spAnonymous + ) + } } val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 75cddc2e9..85877da27 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -2,12 +2,16 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import java.util.UUID +import org.apache.commons.codec.binary.Base64 + import scala.concurrent.duration._ import scala.collection.JavaConverters._ import cats.effect.{Clock, Sync} import cats.implicits._ +import fs2.Stream + import org.http4s._ import org.http4s.headers._ import org.http4s.implicits._ @@ -38,6 +42,11 @@ trait Service[F[_]] { def determinePath(vendor: String, version: String): String } +object CollectorService { + // Contains an invisible pixel to return for `/i` requests. + val pixel = Base64.decodeBase64("R0lGODlhAQABAPAAAP///wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==") +} + class CollectorService[F[_]: Sync]( config: CollectorConfig, sinks: CollectorSinks[F], @@ -45,6 +54,8 @@ class CollectorService[F[_]: Sync]( appVersion: String ) extends Service[F] { + val pixelStream = Stream.iterable[F, Byte](CollectorService.pixel) + // TODO: Add sink type as well private val collector = s"$appName-$appVersion" @@ -70,8 +81,7 @@ class CollectorService[F[_]: Sync]( hostname <- hostname // TODO: Get ipAsPartitionKey from config (ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false) - // TODO: nuid should be set properly - nuid = UUID.randomUUID().toString + nuid = UUID.randomUUID().toString // TODO: nuid should be set properly event = buildEvent( queryString, body, @@ -93,9 +103,13 @@ class CollectorService[F[_]: Sync]( spAnonymous = spAnonymous, now = now ) - responseHeaders = Headers(setCookie.toList.map(_.toRaw1)) + headerList = List( + setCookie.map(_.toRaw1), + cacheControl(pixelExpected).map(_.toRaw1) + ).flatten + responseHeaders = Headers(headerList) _ <- sinkEvent(event, partitionKey) - } yield buildHttpResponse(responseHeaders) + } yield buildHttpResponse(responseHeaders, pixelExpected) def determinePath(vendor: String, version: String): String = { val original = s"/$vendor/$version" @@ -135,8 +149,23 @@ class CollectorService[F[_]: Sync]( } // TODO: Handle necessary cases to build http response in here - def buildHttpResponse(headers: Headers): Response[F] = - Response(status = Ok, headers = headers) + def buildHttpResponse( + headers: Headers, + pixelExpected: Boolean + ): Response[F] = + pixelExpected match { + case true => + Response[F]( + headers = headers.put(`Content-Type`(MediaType.image.gif)), + body = pixelStream + ) + // See https://github.com/snowplow/snowplow-javascript-tracker/issues/482 + case false => + Response[F]( + status = Ok, + headers = headers + ).withEntity("ok") + } // TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers, // they aren't included in here. It might be good to search for counterparts in Http4s. @@ -153,6 +182,12 @@ class CollectorService[F[_]: Sync]( } } + /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ + def cacheControl(pixelExpected: Boolean): Option[`Cache-Control`] = + if (pixelExpected) + Some(`Cache-Control`(CacheDirective.`no-cache`(), CacheDirective.`no-store`, CacheDirective.`must-revalidate`)) + else None + /** Produces the event to the configured sink. */ def sinkEvent( event: CollectorPayload, diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala index 5d01f34fa..d76c640ff 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala @@ -1,16 +1,41 @@ package com.snowplowanalytics.snowplow.collectors.scalastream +import scala.collection.mutable.ListBuffer import cats.effect.IO import cats.effect.unsafe.implicits.global -import org.http4s.implicits.http4sLiteralsSyntax -import org.http4s.{Method, Request, RequestCookie, Response, Status} +import com.comcast.ip4s.SocketAddress +import org.http4s.implicits._ +import org.http4s._ +import org.http4s.headers._ import org.http4s.Status._ import fs2.{Stream, text} +import org.typelevel.ci._ import org.specs2.mutable.Specification class CollectorRoutesSpec extends Specification { - val collectorService = new Service[IO] { + case class CookieParams( + queryString: Option[String], + body: IO[Option[String]], + path: String, + cookie: Option[RequestCookie], + userAgent: Option[String], + refererUri: Option[String], + hostname: IO[Option[String]], + ip: Option[String], + request: Request[IO], + pixelExpected: Boolean, + doNotTrack: Boolean, + contentType: Option[String], + spAnonymous: Option[String] + ) + + class TestService() extends Service[IO] { + + private val cookieCalls: ListBuffer[CookieParams] = ListBuffer() + + def getCookieCalls: List[CookieParams] = cookieCalls.toList + override def cookie( queryString: Option[String], body: IO[Option[String]], @@ -26,28 +51,115 @@ class CollectorRoutesSpec extends Specification { contentType: Option[String], spAnonymous: Option[String] ): IO[Response[IO]] = - IO.pure(Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode))) + IO.delay { + cookieCalls += CookieParams( + queryString, + body, + path, + cookie, + userAgent, + refererUri, + hostname, + ip, + request, + pixelExpected, + doNotTrack, + contentType, + spAnonymous + ) + Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode)) + } override def determinePath(vendor: String, version: String): String = "/p1/p2" } - val routes = new CollectorRoutes[IO](collectorService).value + + val testConnection = Request.Connection( + local = SocketAddress.fromStringIp("127.0.0.1:80").get, + remote = SocketAddress.fromStringIp("127.0.0.1:80").get, + secure = false + ) + + val testHeaders = Headers( + `User-Agent`(ProductId("testUserAgent")), + Referer(Uri.unsafeFromString("example.com")), + Header.Raw(ci"SP-Anonymous", "*"), + `Content-Type`(MediaType.application.json) + ) + + def createTestServices = { + val collectorService = new TestService() + val routes = new CollectorRoutes[IO](collectorService).value + (collectorService, routes) + } "The collector route" should { "respond to the health route with an ok response" in { - val request = Request[IO](method = Method.GET, uri = uri"/health") - val response = routes.run(request).unsafeRunSync() + val (_, routes) = createTestServices + val request = Request[IO](method = Method.GET, uri = uri"/health") + val response = routes.run(request).unsafeRunSync() response.status must beEqualTo(Status.Ok) response.as[String].unsafeRunSync() must beEqualTo("OK") } "respond to the post cookie route with the cookie response" in { - val request = Request[IO](method = Method.POST, uri = uri"/p1/p2") + val (collectorService, routes) = createTestServices + + val request = Request[IO](method = Method.POST, uri = uri"/p3/p4?a=b&c=d") + .withAttribute(Request.Keys.ConnectionInfo, testConnection) + .withEntity("testBody") + .withHeaders(testHeaders) val response = routes.run(request).unsafeRunSync() + val List(cookieParams) = collectorService.getCookieCalls + cookieParams.queryString shouldEqual Some("a=b&c=d") + cookieParams.body.unsafeRunSync() shouldEqual Some("testBody") + cookieParams.path shouldEqual "/p1/p2" + cookieParams.cookie shouldEqual None + cookieParams.userAgent shouldEqual Some("testUserAgent") + cookieParams.refererUri shouldEqual Some("example.com") + cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost") + cookieParams.ip shouldEqual Some("127.0.0.1") + cookieParams.pixelExpected shouldEqual false + cookieParams.doNotTrack shouldEqual false + cookieParams.contentType shouldEqual Some("application/json") + cookieParams.spAnonymous shouldEqual Some("*") + response.status must beEqualTo(Status.Ok) response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie") } + + "respond to the get or head cookie route with the cookie response" in { + def getHeadTest(method: Method) = { + val (collectorService, routes) = createTestServices + + val request = Request[IO](method = method, uri = uri"/p3/p4?a=b&c=d") + .withAttribute(Request.Keys.ConnectionInfo, testConnection) + .withEntity("testBody") + .withHeaders(testHeaders) + val response = routes.run(request).unsafeRunSync() + + val List(cookieParams) = collectorService.getCookieCalls + cookieParams.queryString shouldEqual Some("a=b&c=d") + cookieParams.body.unsafeRunSync() shouldEqual None + cookieParams.path shouldEqual "/p1/p2" + cookieParams.cookie shouldEqual None + cookieParams.userAgent shouldEqual Some("testUserAgent") + cookieParams.refererUri shouldEqual Some("example.com") + cookieParams.hostname.unsafeRunSync() shouldEqual Some("localhost") + cookieParams.ip shouldEqual Some("127.0.0.1") + cookieParams.pixelExpected shouldEqual true + cookieParams.doNotTrack shouldEqual false + cookieParams.contentType shouldEqual None + cookieParams.spAnonymous shouldEqual Some("*") + + response.status must beEqualTo(Status.Ok) + response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie") + } + + getHeadTest(Method.GET) + getHeadTest(Method.HEAD) + } } } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index 08720df71..561f18fe0 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -26,6 +26,11 @@ class CollectorServiceSpec extends Specification { ) val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r + val hs = Headers( + `X-Forwarded-For`(IpAddress.fromString("127.0.0.1")), + Cookie(RequestCookie("cookie", "value")), + `Access-Control-Allow-Credentials`() + ) def probeService(): ProbeService = { val good = new TestSink @@ -162,6 +167,30 @@ class CollectorServiceSpec extends Specification { "image/gif" ).asJava } + + "return necessary cache control headers and respond with pixel when pixelExpected is true" in { + val r = service + .cookie( + queryString = Some("nuid=12"), + body = IO.pure(Some("b")), + path = "p", + cookie = None, + userAgent = None, + refererUri = None, + hostname = IO.pure(Some("h")), + ip = None, + request = Request[IO](), + pixelExpected = true, + doNotTrack = false, + contentType = None, + spAnonymous = Some("*") + ) + .unsafeRunSync() + r.headers.get[`Cache-Control`] shouldEqual Some( + `Cache-Control`(CacheDirective.`no-cache`(), CacheDirective.`no-store`, CacheDirective.`must-revalidate`) + ) + r.body.compile.toList.unsafeRunSync().toArray shouldEqual CollectorService.pixel + } } "buildEvent" in { @@ -235,6 +264,20 @@ class CollectorServiceSpec extends Specification { } } + "buildHttpResponse" in { + "send back a gif if pixelExpected is true" in { + val res = service.buildHttpResponse(hs, pixelExpected = true) + res.status shouldEqual Status.Ok + res.headers shouldEqual hs.put(`Content-Type`(MediaType.image.gif)) + res.body.compile.toList.unsafeRunSync().toArray shouldEqual CollectorService.pixel + } + "send back ok otherwise" in { + val res = service.buildHttpResponse(hs, pixelExpected = false) + res.status shouldEqual Status.Ok + res.bodyText.compile.toList.unsafeRunSync() shouldEqual List("ok") + } + } + "ipAndPartitionkey" in { "give back the ip and partition key as ip if remote address is defined" in { val address = Some("127.0.0.1")