diff --git a/build.sbt b/build.sbt index 20198ca56..78112e2bf 100644 --- a/build.sbt +++ b/build.sbt @@ -130,6 +130,9 @@ lazy val http4s = project Dependencies.Libraries.http4sBlaze, Dependencies.Libraries.http4sNetty, Dependencies.Libraries.log4cats, + Dependencies.Libraries.thrift, + Dependencies.Libraries.badRows, + Dependencies.Libraries.collectorPayload, Dependencies.Libraries.slf4j, Dependencies.Libraries.specs2 ) diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala index 3f0910287..2e3c65318 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorApp.scala @@ -16,17 +16,27 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.net.InetSocketAddress import scala.concurrent.duration.{DurationLong, FiniteDuration} +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ + object CollectorApp { implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - def run[F[_]: Async](mkGood: Resource[F, Sink[F]], mkBad: Resource[F, Sink[F]]): F[ExitCode] = { + def run[F[_]: Async]( + mkGood: Resource[F, Sink[F]], + mkBad: Resource[F, Sink[F]], + config: CollectorConfig, + appName: String, + appVersion: String + ): F[ExitCode] = { val resources = for { bad <- mkBad good <- mkGood _ <- withGracefulShutdown(610.seconds) { - buildHttpServer[F](new CollectorRoutes[F](good, bad).value) + val sinks = CollectorSinks(good, bad) + val collectorService: CollectorService[F] = new CollectorService[F](config, sinks, appName, appVersion) + buildHttpServer[F](new CollectorRoutes[F](collectorService).value) } } yield () diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala index 3413e161e..7cd76a3d7 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala @@ -1,17 +1,50 @@ package com.snowplowanalytics.snowplow.collectors.scalastream +import cats.implicits._ import cats.effect.Sync -import org.http4s.{HttpApp, HttpRoutes} +import org.typelevel.ci.CIString +import org.http4s.{HttpApp, HttpRoutes, Request} import org.http4s.dsl.Http4sDsl +import org.http4s.implicits._ +import com.comcast.ip4s.Dns -class CollectorRoutes[F[_]: Sync](good: Sink[F], bad: Sink[F]) extends Http4sDsl[F] { +class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDsl[F] { - val _ = (good, bad) + implicit val dns: Dns[F] = Dns.forSync[F] - lazy val value: HttpApp[F] = HttpRoutes + private val healthRoutes = HttpRoutes .of[F] { case GET -> Root / "health" => Ok("OK") } - .orNotFound + + private val cookieRoutes = HttpRoutes + .of[F] { + case req@POST -> Root / vendor / version => + val path = collectorService.determinePath(vendor, version) + val userAgent = extractHeader(req, "User-Agent") + val referer = extractHeader(req, "Referer") + val spAnonymous = extractHeader(req, "SP-Anonymous") + + collectorService.cookie( + queryString = Some(req.queryString), + body = req.bodyText.compile.string.map(Some(_)), + path = path, + cookie = None, //TODO: cookie will be added later + userAgent = userAgent, + refererUri = referer, + hostname = req.remoteHost.map(_.map(_.toString)), + ip = req.remoteAddr.map(_.toUriString), // TODO: Do not set the ip if request contains SP-Anonymous header + request = req, + pixelExpected = false, + doNotTrack = false, + contentType = req.contentType.map(_.value.toLowerCase), + spAnonymous = spAnonymous + ) + } + + val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound + + def extractHeader(req:Request[F], headerName: String): Option[String] = + req.headers.get(CIString(headerName)).map(_.head.value) } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala new file mode 100644 index 000000000..55a7d66e5 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -0,0 +1,170 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import java.util.UUID + +import scala.collection.JavaConverters._ + +import cats.effect.Sync +import cats.implicits._ + +import org.http4s.{Request, Response, RequestCookie} +import org.http4s.Status._ + +import org.typelevel.ci._ + +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload + +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ + +trait Service[F[_]] { + def cookie( + queryString: Option[String], + body: F[Option[String]], + path: String, + cookie: Option[RequestCookie], + userAgent: Option[String], + refererUri: Option[String], + hostname: F[Option[String]], + ip: Option[String], + request: Request[F], + pixelExpected: Boolean, + doNotTrack: Boolean, + contentType: Option[String] = None, + spAnonymous: Option[String] = None + ): F[Response[F]] + def determinePath(vendor: String, version: String): String +} + +class CollectorService[F[_]: Sync]( + config: CollectorConfig, + sinks: CollectorSinks[F], + appName: String, + appVersion: String +) extends Service[F] { + + // TODO: Add sink type as well + private val collector = s"$appName-$appVersion" + + private val splitBatch: SplitBatch = SplitBatch(appName, appVersion) + + def cookie( + queryString: Option[String], + body: F[Option[String]], + path: String, + cookie: Option[RequestCookie], + userAgent: Option[String], + refererUri: Option[String], + hostname: F[Option[String]], + ip: Option[String], + request: Request[F], + pixelExpected: Boolean, + doNotTrack: Boolean, + contentType: Option[String] = None, + spAnonymous: Option[String] = None + ): F[Response[F]] = + for { + body <- body + hostname <- hostname + // TODO: Get ipAsPartitionKey from config + (ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false) + // TODO: nuid should be set properly + nuid = UUID.randomUUID().toString + event = buildEvent( + queryString, + body, + path, + userAgent, + refererUri, + hostname, + ipAddress, + nuid, + contentType, + headers(request, spAnonymous) + ) + _ <- sinkEvent(event, partitionKey) + } yield buildHttpResponse + + def determinePath(vendor: String, version: String): String = { + val original = s"/$vendor/$version" + config.paths.getOrElse(original, original) + } + + /** Builds a raw event from an Http request. */ + def buildEvent( + queryString: Option[String], + body: Option[String], + path: String, + userAgent: Option[String], + refererUri: Option[String], + hostname: Option[String], + ipAddress: String, + networkUserId: String, + contentType: Option[String], + headers: List[String] + ): CollectorPayload = { + val e = new CollectorPayload( + "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", + ipAddress, + System.currentTimeMillis, + "UTF-8", + collector + ) + queryString.foreach(e.querystring = _) + body.foreach(e.body = _) + e.path = path + userAgent.foreach(e.userAgent = _) + refererUri.foreach(e.refererUri = _) + hostname.foreach(e.hostname = _) + e.networkUserId = networkUserId + e.headers = (headers ++ contentType).asJava + contentType.foreach(e.contentType = _) + e + } + + // TODO: Handle necessary cases to build http response in here + def buildHttpResponse: Response[F] = Response(status = Ok) + + // TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers, + // they aren't included in here. It might be good to search for counterparts in Http4s. + /** If the SP-Anonymous header is not present, retrieves all headers + * from the request. + * If the SP-Anonymous header is present, additionally filters out the + * X-Forwarded-For, X-Real-IP and Cookie headers as well. + */ + def headers(request: Request[F], spAnonymous: Option[String]): List[String] = + request.headers.headers.flatMap { h => + h.name match { + case ci"X-Forwarded-For" | ci"X-Real-Ip" | ci"Cookie" if spAnonymous.isDefined => None + case _ => Some(h.toString()) + } + } + + /** Produces the event to the configured sink. */ + def sinkEvent( + event: CollectorPayload, + partitionKey: String + ): F[Unit] = + for { + // Split events into Good and Bad + eventSplit <- Sync[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes)) + // Send events to respective sinks + _ <- sinks.good.storeRawEvents(eventSplit.good, partitionKey) + _ <- sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + } yield () + + /** + * Gets the IP from a RemoteAddress. If ipAsPartitionKey is false, a UUID will be generated. + * + * @param remoteAddress Address extracted from an HTTP request + * @param ipAsPartitionKey Whether to use the ip as a partition key or a random UUID + * @return a tuple of ip (unknown if it couldn't be extracted) and partition key + */ + def ipAndPartitionKey( + ipAddress: Option[String], + ipAsPartitionKey: Boolean + ): (String, String) = + ipAddress match { + case None => ("unknown", UUID.randomUUID.toString) + case Some(ip) => (ip, if (ipAsPartitionKey) ip else UUID.randomUUID.toString) + } +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatch.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatch.scala new file mode 100644 index 000000000..ca977fa35 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatch.scala @@ -0,0 +1,153 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets.UTF_8 +import java.time.Instant +import org.apache.thrift.TSerializer + +import cats.syntax.either._ +import io.circe.Json +import io.circe.parser._ +import io.circe.syntax._ + +import com.snowplowanalytics.iglu.core._ +import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ +import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ + +/** Object handling splitting an array of strings correctly */ +case class SplitBatch(appName: String, appVersion: String) { + + // Serialize Thrift CollectorPayload objects + val ThriftSerializer = new ThreadLocal[TSerializer] { + override def initialValue = new TSerializer() + } + + /** + * Split a list of strings into batches, none of them exceeding a given size + * Input strings exceeding the given size end up in the failedBigEvents field of the result + * @param input List of strings + * @param maximum No good batch can exceed this size + * @param joinSize Constant to add to the size of the string representing the additional comma + * needed to join separate event JSONs in a single array + * @return split batch containing list of good batches and list of events that were too big + */ + def split(input: List[Json], maximum: Int, joinSize: Int = 1): SplitBatchResult = { + @scala.annotation.tailrec + def iterbatch( + l: List[Json], + currentBatch: List[Json], + currentTotal: Long, + acc: List[List[Json]], + failedBigEvents: List[Json] + ): SplitBatchResult = l match { + case Nil => + currentBatch match { + case Nil => SplitBatchResult(acc, failedBigEvents) + case nonemptyBatch => SplitBatchResult(nonemptyBatch :: acc, failedBigEvents) + } + case h :: t => + val headSize = getSize(h.noSpaces) + if (headSize + joinSize > maximum) { + iterbatch(t, currentBatch, currentTotal, acc, h :: failedBigEvents) + } else if (headSize + currentTotal + joinSize > maximum) { + iterbatch(l, Nil, 0, currentBatch :: acc, failedBigEvents) + } else { + iterbatch(t, h :: currentBatch, headSize + currentTotal + joinSize, acc, failedBigEvents) + } + } + + iterbatch(input, Nil, 0, Nil, Nil) + } + + /** + * If the CollectorPayload is too big to fit in a single record, attempt to split it into + * multiple records. + * @param event Incoming CollectorPayload + * @return a List of Good and Bad events + */ + def splitAndSerializePayload(event: CollectorPayload, maxBytes: Int): EventSerializeResult = { + val serializer = ThriftSerializer.get() + val everythingSerialized = serializer.serialize(event) + val wholeEventBytes = getSize(everythingSerialized) + + // If the event is below the size limit, no splitting is necessary + if (wholeEventBytes < maxBytes) { + EventSerializeResult(List(everythingSerialized), Nil) + } else { + (for { + body <- Option(event.getBody).toRight("GET requests cannot be split") + children <- splitBody(body) + initialBodyDataBytes = getSize(Json.arr(children._2: _*).noSpaces) + _ <- Either.cond[String, Unit]( + wholeEventBytes - initialBodyDataBytes < maxBytes, + (), + "cannot split this POST request because event without \"data\" field is still too big" + ) + splitted = split(children._2, maxBytes - wholeEventBytes + initialBodyDataBytes) + goodSerialized = serializeBatch(serializer, event, splitted.goodBatches, children._1) + badList = splitted.failedBigEvents.map { e => + val msg = "this POST request split is still too large" + oversizedPayload(event, getSize(e), maxBytes, msg) + } + } yield EventSerializeResult(goodSerialized, badList)).fold({ msg => + val tooBigPayload = oversizedPayload(event, wholeEventBytes, maxBytes, msg) + EventSerializeResult(Nil, List(tooBigPayload)) + }, identity) + } + } + + def splitBody(body: String): Either[String, (SchemaKey, List[Json])] = + for { + json <- parse(body).leftMap(e => s"cannot split POST requests which are not json ${e.getMessage}") + sdd <- json + .as[SelfDescribingData[Json]] + .leftMap(e => s"cannot split POST requests which are not self-describing ${e.getMessage}") + array <- sdd.data.asArray.toRight("cannot split POST requests which do not contain a data array") + } yield (sdd.schema, array.toList) + + /** + * Creates a bad row while maintaining a truncation of the original payload to ease debugging. + * Keeps a tenth of the original payload. + * @param event original payload + * @param size size of the oversized payload + * @param maxSize maximum size allowed + * @param msg error message + * @return the created bad rows as json + */ + private def oversizedPayload( + event: CollectorPayload, + size: Int, + maxSize: Int, + msg: String + ): Array[Byte] = + BadRow + .SizeViolation( + Processor(appName, appVersion), + Failure.SizeViolation(Instant.now(), maxSize, size, s"oversized collector payload: $msg"), + Payload.RawPayload(event.toString().take(maxSize / 10)) + ) + .compact + .getBytes(UTF_8) + + private def getSize(a: Array[Byte]): Int = ByteBuffer.wrap(a).capacity + + private def getSize(s: String): Int = getSize(s.getBytes(UTF_8)) + + private def getSize(j: Json): Int = getSize(j.noSpaces) + + private def serializeBatch( + serializer: TSerializer, + event: CollectorPayload, + batches: List[List[Json]], + schema: SchemaKey + ): List[Array[Byte]] = + batches.map { batch => + val payload = event.deepCopy() + val body = SelfDescribingData[Json](schema, Json.arr(batch: _*)) + payload.setBody(body.asJson.noSpaces) + serializer.serialize(payload) + } +} + diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala new file mode 100644 index 000000000..c3b41fc6b --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -0,0 +1,32 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import io.circe.Json + +object model { + /** + * Case class for holding both good and + * bad sinks for the Stream Collector. + */ + final case class CollectorSinks[F[_]](good: Sink[F], bad: Sink[F]) + + /** + * Case class for holding the results of + * splitAndSerializePayload. + * + * @param good All good results + * @param bad All bad results + */ + final case class EventSerializeResult(good: List[Array[Byte]], bad: List[Array[Byte]]) + + /** + * Class for the result of splitting a too-large array of events in the body of a POST request + * + * @param goodBatches List of batches of events + * @param failedBigEvents List of events that were too large + */ + final case class SplitBatchResult(goodBatches: List[List[Json]], failedBigEvents: List[Json]) + + final case class CollectorConfig( + paths: Map[String, String], + ) +} diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala index d3c2c7b57..3c1263086 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutesSpec.scala @@ -3,19 +3,60 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect.IO import cats.effect.unsafe.implicits.global import org.http4s.implicits.http4sLiteralsSyntax -import org.http4s.{Method, Request, Status} +import org.http4s.{Method, Request, RequestCookie, Response, Status} +import org.http4s.Status._ +import fs2.{Stream, text} import org.specs2.mutable.Specification +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ class CollectorRoutesSpec extends Specification { - "Health endpoint" should { - "return OK always because collector always works" in { - val request = Request[IO](method = Method.GET, uri = uri"/health") - val response = new CollectorRoutes[IO].value.run(request).unsafeRunSync() + val goodSink = new TestSink() + val badSink = new TestSink() + val sinks = CollectorSinks[IO](good = goodSink, bad = badSink) + val config = CollectorConfig( + paths = Map.empty + ) + + val collectorService = new Service[IO] { + override def cookie( + queryString: Option[String], + body: IO[Option[String]], + path: String, + cookie: Option[RequestCookie], + userAgent: Option[String], + refererUri: Option[String], + hostname: IO[Option[String]], + ip: Option[String], + request: Request[IO], + pixelExpected: Boolean, + doNotTrack: Boolean, + contentType: Option[String], + spAnonymous: Option[String] + ): IO[Response[IO]] = + IO.pure(Response(status = Ok, body = Stream.emit("cookie").through(text.utf8.encode))) + + override def determinePath(vendor: String, version: String): String = "/p1/p2" + } + val routes = new CollectorRoutes[IO](collectorService).value + + + "The collector route" should { + "respond to the health route with an ok response" in { + val request = Request[IO](method = Method.GET, uri = uri"/health") + val response = routes.run(request).unsafeRunSync() response.status must beEqualTo(Status.Ok) response.as[String].unsafeRunSync() must beEqualTo("OK") } + + "respond to the post cookie route with the cookie response" in { + val request = Request[IO](method = Method.POST, uri = uri"/p1/p2") + val response = routes.run(request).unsafeRunSync() + + response.status must beEqualTo(Status.Ok) + response.bodyText.compile.string.unsafeRunSync() must beEqualTo("cookie") + } } } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala new file mode 100644 index 000000000..18044f8c3 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -0,0 +1,261 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import scala.collection.JavaConverters._ +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload +import org.http4s.{Method, Request, Status, Headers, RequestCookie} +import org.http4s.headers._ +import com.comcast.ip4s.IpAddress +import org.specs2.mutable.Specification +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import org.apache.thrift.{TDeserializer, TSerializer} + +class CollectorServiceSpec extends Specification { + case class ProbeService(service: CollectorService[IO], good: TestSink, bad: TestSink) + + val service = new CollectorService[IO]( + config = TestUtils.testConf, + sinks = CollectorSinks[IO](new TestSink, new TestSink), + appName = "appName", + appVersion = "appVersion" + ) + val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") + val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r + + def probeService(): ProbeService = { + val good = new TestSink + val bad = new TestSink + val service = new CollectorService[IO]( + config = TestUtils.testConf, + sinks = CollectorSinks[IO](good, bad), + appName = "appName", + appVersion = "appVersion" + ) + ProbeService(service, good, bad) + } + + def emptyCollectorPayload: CollectorPayload = + new CollectorPayload(null, null, System.currentTimeMillis, null, null) + + def serializer = new TSerializer() + def deserializer = new TDeserializer() + + "The collector service" should { + "cookie" in { + "respond with a 200 OK and a good row in good sink" in { + val ProbeService(service, good, bad) = probeService() + val headers = Headers( + `X-Forwarded-For`(IpAddress.fromString("127.0.0.1")), + Cookie(RequestCookie("cookie", "value")), + `Access-Control-Allow-Credentials`() + ) + val req = Request[IO]( + method = Method.POST, + headers = headers + ) + val r = service.cookie( + queryString = Some("a=b"), + body = IO.pure(Some("b")), + path = "p", + cookie = None, + userAgent = Some("ua"), + refererUri = Some("ref"), + hostname = IO.pure(Some("h")), + ip = Some("ip"), + request = req, + pixelExpected = false, + doNotTrack = false, + contentType = Some("image/gif"), + spAnonymous = None + ).unsafeRunSync() + + r.status mustEqual Status.Ok + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 + + val e = emptyCollectorPayload + deserializer.deserialize(e, good.storedRawEvents.head) + e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" + e.ipAddress shouldEqual "ip" + e.encoding shouldEqual "UTF-8" + e.collector shouldEqual s"appName-appVersion" + e.querystring shouldEqual "a=b" + e.body shouldEqual "b" + e.path shouldEqual "p" + e.userAgent shouldEqual "ua" + e.refererUri shouldEqual "ref" + e.hostname shouldEqual "h" + //e.networkUserId shouldEqual "nuid" //TODO: add check for nuid as well + e.headers shouldEqual List( + "X-Forwarded-For: 127.0.0.1", + "Cookie: cookie=value", + "Access-Control-Allow-Credentials: true", + "image/gif" + ).asJava + e.contentType shouldEqual "image/gif" + } + + "sink event with headers removed when spAnonymous set" in { + val ProbeService(service, good, bad) = probeService() + val headers = Headers( + `X-Forwarded-For`(IpAddress.fromString("127.0.0.1")), + Cookie(RequestCookie("cookie", "value")), + `Access-Control-Allow-Credentials`() + ) + val req = Request[IO]( + method = Method.POST, + headers = headers + ) + val r = service.cookie( + queryString = Some("a=b"), + body = IO.pure(Some("b")), + path = "p", + cookie = None, + userAgent = Some("ua"), + refererUri = Some("ref"), + hostname = IO.pure(Some("h")), + ip = Some("ip"), + request = req, + pixelExpected = false, + doNotTrack = false, + contentType = Some("image/gif"), + spAnonymous = Some("*") + ).unsafeRunSync() + + r.status mustEqual Status.Ok + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 + + val e = emptyCollectorPayload + deserializer.deserialize(e, good.storedRawEvents.head) + e.headers shouldEqual List( + "Access-Control-Allow-Credentials: true", + "image/gif" + ).asJava + } + } + + "buildEvent" in { + "fill the correct values" in { + val ct = Some("image/gif") + val headers = List("X-Forwarded-For", "X-Real-Ip") + val e = service.buildEvent( + Some("q"), + Some("b"), + "p", + Some("ua"), + Some("ref"), + Some("h"), + "ip", + "nuid", + ct, + headers + ) + e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" + e.ipAddress shouldEqual "ip" + e.encoding shouldEqual "UTF-8" + e.collector shouldEqual s"appName-appVersion" + e.querystring shouldEqual "q" + e.body shouldEqual "b" + e.path shouldEqual "p" + e.userAgent shouldEqual "ua" + e.refererUri shouldEqual "ref" + e.hostname shouldEqual "h" + e.networkUserId shouldEqual "nuid" + e.headers shouldEqual (headers ::: ct.toList).asJava + e.contentType shouldEqual ct.get + } + + "set fields to null if they aren't set" in { + val headers = List() + val e = service.buildEvent( + None, + None, + "p", + None, + None, + None, + "ip", + "nuid", + None, + headers + ) + e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" + e.ipAddress shouldEqual "ip" + e.encoding shouldEqual "UTF-8" + e.collector shouldEqual s"appName-appVersion" + e.querystring shouldEqual null + e.body shouldEqual null + e.path shouldEqual "p" + e.userAgent shouldEqual null + e.refererUri shouldEqual null + e.hostname shouldEqual null + e.networkUserId shouldEqual "nuid" + e.headers shouldEqual headers.asJava + e.contentType shouldEqual null + } + } + + "sinkEvent" in { + "send back the produced events" in { + val ProbeService(s, good, bad) = probeService() + s.sinkEvent(event, "key").unsafeRunSync() + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 + good.storedRawEvents.head.zip(serializer.serialize(event)).forall { case (a, b) => a mustEqual b } + } + } + + "ipAndPartitionkey" in { + "give back the ip and partition key as ip if remote address is defined" in { + val address = Some("127.0.0.1") + service.ipAndPartitionKey(address, true) shouldEqual (("127.0.0.1", "127.0.0.1")) + } + "give back the ip and a uuid as partition key if ipAsPartitionKey is false" in { + val address = Some("127.0.0.1") + val (ip, pkey) = service.ipAndPartitionKey(address, false) + ip shouldEqual "127.0.0.1" + pkey must beMatching(uuidRegex) + } + "give back unknown as ip and a random uuid as partition key if the address isn't known" in { + val (ip, pkey) = service.ipAndPartitionKey(None, true) + ip shouldEqual "unknown" + pkey must beMatching(uuidRegex) + } + } + + "determinePath" in { + val vendor = "com.acme" + val version1 = "track" + val version2 = "redirect" + val version3 = "iglu" + + "should correctly replace the path in the request if a mapping is provided" in { + val expected1 = "/com.snowplowanalytics.snowplow/tp2" + val expected2 = "/r/tp2" + val expected3 = "/com.snowplowanalytics.iglu/v1" + + service.determinePath(vendor, version1) shouldEqual expected1 + service.determinePath(vendor, version2) shouldEqual expected2 + service.determinePath(vendor, version3) shouldEqual expected3 + } + + "should pass on the original path if no mapping for it can be found" in { + val service = new CollectorService( + TestUtils.testConf.copy(paths = Map.empty[String, String]), + CollectorSinks(new TestSink, new TestSink), + "", + "" + ) + val expected1 = "/com.acme/track" + val expected2 = "/com.acme/redirect" + val expected3 = "/com.acme/iglu" + + service.determinePath(vendor, version1) shouldEqual expected1 + service.determinePath(vendor, version2) shouldEqual expected2 + service.determinePath(vendor, version3) shouldEqual expected3 + } + } + } +} diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatchSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatchSpec.scala new file mode 100644 index 000000000..12b9f4c27 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SplitBatchSpec.scala @@ -0,0 +1,145 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import org.apache.thrift.TDeserializer + +import io.circe.Json +import io.circe.parser._ +import io.circe.syntax._ + +import com.snowplowanalytics.iglu.core.circe.implicits._ +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload +import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.collectors.scalastream.model.SplitBatchResult + +import org.specs2.mutable.Specification + +class SplitBatchSpec extends Specification { + val splitBatch: SplitBatch = SplitBatch("app", "version") + + "SplitBatch.split" should { + "Batch a list of strings based on size" in { + splitBatch.split(List("a", "b", "c").map(Json.fromString), 9, 1) must_== + SplitBatchResult(List(List("c"), List("b", "a")).map(_.map(Json.fromString)), Nil) + } + + "Reject only those strings which are too big" in { + splitBatch.split(List("1234567", "1", "123").map(Json.fromString), 8, 0) must_== + SplitBatchResult(List(List("123", "1").map(Json.fromString)), List("1234567").map(Json.fromString)) + } + + "Batch a long list of strings" in { + splitBatch.split( + List("123456778901", "123456789", "12345678", "1234567", "123456", "12345", "1234", "123", "12", "1") + .map(Json.fromString), + 13, + 0 + ) must_== + SplitBatchResult( + List( + List("1", "12", "123"), + List("1234", "12345"), + List("123456"), + List("1234567"), + List("12345678"), + List("123456789") + ).map(_.map(Json.fromString)), + List("123456778901").map(Json.fromString) + ) + } + } + + "SplitBatch.splitAndSerializePayload" should { + "Serialize an empty CollectorPayload" in { + val actual = splitBatch.splitAndSerializePayload(new CollectorPayload(), 100) + val target = new CollectorPayload() + new TDeserializer().deserialize(target, actual.good.head) + target must_== new CollectorPayload() + } + + "Reject an oversized GET CollectorPayload" in { + val payload = new CollectorPayload() + payload.setQuerystring("x" * 1000) + val actual = splitBatch.splitAndSerializePayload(payload, 100) + val res = parse(new String(actual.bad.head)).toOption.get + val selfDesc = SelfDescribingData.parse(res).toOption.get + val badRow = selfDesc.data.as[BadRow].toOption.get + badRow must beAnInstanceOf[BadRow.SizeViolation] + val sizeViolation = badRow.asInstanceOf[BadRow.SizeViolation] + sizeViolation.failure.maximumAllowedSizeBytes must_== 100 + sizeViolation.failure.actualSizeBytes must_== 1019 + sizeViolation.failure.expectation must_== "oversized collector payload: GET requests cannot be split" + sizeViolation.payload.event must_== "CollectorP" + sizeViolation.processor shouldEqual Processor("app", "version") + actual.good must_== Nil + } + + "Reject an oversized POST CollectorPayload with an unparseable body" in { + val payload = new CollectorPayload() + payload.setBody("s" * 1000) + val actual = splitBatch.splitAndSerializePayload(payload, 100) + val res = parse(new String(actual.bad.head)).toOption.get + val selfDesc = SelfDescribingData.parse(res).toOption.get + val badRow = selfDesc.data.as[BadRow].toOption.get + badRow must beAnInstanceOf[BadRow.SizeViolation] + val sizeViolation = badRow.asInstanceOf[BadRow.SizeViolation] + sizeViolation.failure.maximumAllowedSizeBytes must_== 100 + sizeViolation.failure.actualSizeBytes must_== 1019 + sizeViolation + .failure + .expectation must_== "oversized collector payload: cannot split POST requests which are not json expected json value got 'ssssss...' (line 1, column 1)" + sizeViolation.payload.event must_== "CollectorP" + sizeViolation.processor shouldEqual Processor("app", "version") + } + + "Reject an oversized POST CollectorPayload which would be oversized even without its body" in { + val payload = new CollectorPayload() + val data = Json.obj( + "schema" := Json.fromString("s"), + "data" := Json.arr( + Json.obj("e" := "se", "tv" := "js"), + Json.obj("e" := "se", "tv" := "js") + ) + ) + payload.setBody(data.noSpaces) + payload.setPath("p" * 1000) + val actual = splitBatch.splitAndSerializePayload(payload, 1000) + actual.bad.size must_== 1 + val res = parse(new String(actual.bad.head)).toOption.get + val selfDesc = SelfDescribingData.parse(res).toOption.get + val badRow = selfDesc.data.as[BadRow].toOption.get + badRow must beAnInstanceOf[BadRow.SizeViolation] + val sizeViolation = badRow.asInstanceOf[BadRow.SizeViolation] + sizeViolation.failure.maximumAllowedSizeBytes must_== 1000 + sizeViolation.failure.actualSizeBytes must_== 1091 + sizeViolation + .failure + .expectation must_== "oversized collector payload: cannot split POST requests which are not self-describing Invalid Iglu URI: s, code: INVALID_IGLUURI" + sizeViolation + .payload + .event must_== "CollectorPayload(schema:null, ipAddress:null, timestamp:0, encoding:null, collector:null, path:ppppp" + sizeViolation.processor shouldEqual Processor("app", "version") + } + + "Split a CollectorPayload with three large events and four very large events" in { + val payload = new CollectorPayload() + val data = Json.obj( + "schema" := Schemas.SizeViolation.toSchemaUri, + "data" := Json.arr( + Json.obj("e" := "se", "tv" := "x" * 600), + Json.obj("e" := "se", "tv" := "x" * 5), + Json.obj("e" := "se", "tv" := "x" * 600), + Json.obj("e" := "se", "tv" := "y" * 1000), + Json.obj("e" := "se", "tv" := "y" * 1000), + Json.obj("e" := "se", "tv" := "y" * 1000), + Json.obj("e" := "se", "tv" := "y" * 1000) + ) + ) + payload.setBody(data.noSpaces) + val actual = splitBatch.splitAndSerializePayload(payload, 1000) + actual.bad.size must_== 4 + actual.good.size must_== 2 + } + } +} + diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala new file mode 100644 index 000000000..2c273a603 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala @@ -0,0 +1,20 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.IO + +import scala.collection.mutable.ListBuffer + +class TestSink extends Sink[IO] { + + private val buf: ListBuffer[Array[Byte]] = ListBuffer() + + override val maxBytes: Int = Int.MaxValue + + override def isHealthy: IO[Boolean] = IO.pure(true) + + override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = + IO.delay(buf ++= events) + + def storedRawEvents: List[Array[Byte]] = buf.toList + +} diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala new file mode 100644 index 000000000..cbcecc213 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala @@ -0,0 +1,14 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorConfig + +object TestUtils { + + val testConf = CollectorConfig( + paths = Map( + "/com.acme/track" -> "/com.snowplowanalytics.snowplow/tp2", + "/com.acme/redirect" -> "/r/tp2", + "/com.acme/iglu" -> "/com.snowplowanalytics.iglu/v1" + ) + ) +} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala index 62cc51ac7..11c5f5085 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/StdoutCollector.scala @@ -21,12 +21,21 @@ import cats.implicits._ import java.util.Base64 import java.io.PrintStream +import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo + object StdoutCollector extends IOApp { def run(args: List[String]): IO[ExitCode] = { val good = Resource.pure[IO, Sink[IO]](printingSink(System.out)) val bad = Resource.pure[IO, Sink[IO]](printingSink(System.err)) - CollectorApp.run[IO](good, bad) + CollectorApp.run[IO]( + good, + bad, + CollectorConfig(Map.empty), + BuildInfo.shortName, + BuildInfo.version + ) } private def printingSink[F[_]: Sync](stream: PrintStream): Sink[F] = new Sink[F] {