-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
904 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 38 additions & 5 deletions
43
...rc/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoutes.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,50 @@ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream | ||
|
||
import cats.implicits._ | ||
import cats.effect.Sync | ||
import org.http4s.{HttpApp, HttpRoutes} | ||
import org.typelevel.ci.CIString | ||
import org.http4s.{HttpApp, HttpRoutes, Request} | ||
import org.http4s.dsl.Http4sDsl | ||
import org.http4s.implicits._ | ||
import com.comcast.ip4s.Dns | ||
|
||
class CollectorRoutes[F[_]: Sync](good: Sink[F], bad: Sink[F]) extends Http4sDsl[F] { | ||
class CollectorRoutes[F[_]: Sync](collectorService: Service[F]) extends Http4sDsl[F] { | ||
|
||
val _ = (good, bad) | ||
implicit val dns: Dns[F] = Dns.forSync[F] | ||
|
||
lazy val value: HttpApp[F] = HttpRoutes | ||
private val healthRoutes = HttpRoutes | ||
.of[F] { | ||
case GET -> Root / "health" => | ||
Ok("OK") | ||
} | ||
.orNotFound | ||
|
||
private val cookieRoutes = HttpRoutes | ||
.of[F] { | ||
case req@POST -> Root / vendor / version => | ||
val path = collectorService.determinePath(vendor, version) | ||
val userAgent = extractHeader(req, "User-Agent") | ||
val referer = extractHeader(req, "Referer") | ||
val spAnonymous = extractHeader(req, "SP-Anonymous") | ||
|
||
collectorService.cookie( | ||
queryString = Some(req.queryString), | ||
body = req.bodyText.compile.string.map(Some(_)), | ||
path = path, | ||
cookie = None, //TODO: cookie will be added later | ||
userAgent = userAgent, | ||
refererUri = referer, | ||
hostname = req.remoteHost.map(_.map(_.toString)), | ||
ip = req.remoteAddr.map(_.toUriString), // TODO: Do not set the ip if request contains SP-Anonymous header | ||
request = req, | ||
pixelExpected = false, | ||
doNotTrack = false, | ||
contentType = req.contentType.map(_.value.toLowerCase), | ||
spAnonymous = spAnonymous | ||
) | ||
} | ||
|
||
val value: HttpApp[F] = (healthRoutes <+> cookieRoutes).orNotFound | ||
|
||
def extractHeader(req:Request[F], headerName: String): Option[String] = | ||
req.headers.get(CIString(headerName)).map(_.head.value) | ||
} |
170 changes: 170 additions & 0 deletions
170
...c/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream | ||
|
||
import java.util.UUID | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import cats.effect.Sync | ||
import cats.implicits._ | ||
|
||
import org.http4s.{Request, Response, RequestCookie} | ||
import org.http4s.Status._ | ||
|
||
import org.typelevel.ci._ | ||
|
||
import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload | ||
|
||
import com.snowplowanalytics.snowplow.collectors.scalastream.model._ | ||
|
||
trait Service[F[_]] { | ||
def cookie( | ||
queryString: Option[String], | ||
body: F[Option[String]], | ||
path: String, | ||
cookie: Option[RequestCookie], | ||
userAgent: Option[String], | ||
refererUri: Option[String], | ||
hostname: F[Option[String]], | ||
ip: Option[String], | ||
request: Request[F], | ||
pixelExpected: Boolean, | ||
doNotTrack: Boolean, | ||
contentType: Option[String] = None, | ||
spAnonymous: Option[String] = None | ||
): F[Response[F]] | ||
def determinePath(vendor: String, version: String): String | ||
} | ||
|
||
class CollectorService[F[_]: Sync]( | ||
config: CollectorConfig, | ||
sinks: CollectorSinks[F], | ||
appName: String, | ||
appVersion: String | ||
) extends Service[F] { | ||
|
||
// TODO: Add sink type as well | ||
private val collector = s"$appName-$appVersion" | ||
|
||
private val splitBatch: SplitBatch = SplitBatch(appName, appVersion) | ||
|
||
def cookie( | ||
queryString: Option[String], | ||
body: F[Option[String]], | ||
path: String, | ||
cookie: Option[RequestCookie], | ||
userAgent: Option[String], | ||
refererUri: Option[String], | ||
hostname: F[Option[String]], | ||
ip: Option[String], | ||
request: Request[F], | ||
pixelExpected: Boolean, | ||
doNotTrack: Boolean, | ||
contentType: Option[String] = None, | ||
spAnonymous: Option[String] = None | ||
): F[Response[F]] = | ||
for { | ||
body <- body | ||
hostname <- hostname | ||
// TODO: Get ipAsPartitionKey from config | ||
(ipAddress, partitionKey) = ipAndPartitionKey(ip, ipAsPartitionKey = false) | ||
// TODO: nuid should be set properly | ||
nuid = UUID.randomUUID().toString | ||
event = buildEvent( | ||
queryString, | ||
body, | ||
path, | ||
userAgent, | ||
refererUri, | ||
hostname, | ||
ipAddress, | ||
nuid, | ||
contentType, | ||
headers(request, spAnonymous) | ||
) | ||
_ <- sinkEvent(event, partitionKey) | ||
} yield buildHttpResponse | ||
|
||
def determinePath(vendor: String, version: String): String = { | ||
val original = s"/$vendor/$version" | ||
config.paths.getOrElse(original, original) | ||
} | ||
|
||
/** Builds a raw event from an Http request. */ | ||
def buildEvent( | ||
queryString: Option[String], | ||
body: Option[String], | ||
path: String, | ||
userAgent: Option[String], | ||
refererUri: Option[String], | ||
hostname: Option[String], | ||
ipAddress: String, | ||
networkUserId: String, | ||
contentType: Option[String], | ||
headers: List[String] | ||
): CollectorPayload = { | ||
val e = new CollectorPayload( | ||
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", | ||
ipAddress, | ||
System.currentTimeMillis, | ||
"UTF-8", | ||
collector | ||
) | ||
queryString.foreach(e.querystring = _) | ||
body.foreach(e.body = _) | ||
e.path = path | ||
userAgent.foreach(e.userAgent = _) | ||
refererUri.foreach(e.refererUri = _) | ||
hostname.foreach(e.hostname = _) | ||
e.networkUserId = networkUserId | ||
e.headers = (headers ++ contentType).asJava | ||
contentType.foreach(e.contentType = _) | ||
e | ||
} | ||
|
||
// TODO: Handle necessary cases to build http response in here | ||
def buildHttpResponse: Response[F] = Response(status = Ok) | ||
|
||
// TODO: Since Remote-Address and Raw-Request-URI is akka-specific headers, | ||
// they aren't included in here. It might be good to search for counterparts in Http4s. | ||
/** If the SP-Anonymous header is not present, retrieves all headers | ||
* from the request. | ||
* If the SP-Anonymous header is present, additionally filters out the | ||
* X-Forwarded-For, X-Real-IP and Cookie headers as well. | ||
*/ | ||
def headers(request: Request[F], spAnonymous: Option[String]): List[String] = | ||
request.headers.headers.flatMap { h => | ||
h.name match { | ||
case ci"X-Forwarded-For" | ci"X-Real-Ip" | ci"Cookie" if spAnonymous.isDefined => None | ||
case _ => Some(h.toString()) | ||
} | ||
} | ||
|
||
/** Produces the event to the configured sink. */ | ||
def sinkEvent( | ||
event: CollectorPayload, | ||
partitionKey: String | ||
): F[Unit] = | ||
for { | ||
// Split events into Good and Bad | ||
eventSplit <- Sync[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes)) | ||
// Send events to respective sinks | ||
_ <- sinks.good.storeRawEvents(eventSplit.good, partitionKey) | ||
_ <- sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) | ||
} yield () | ||
|
||
/** | ||
* Gets the IP from a RemoteAddress. If ipAsPartitionKey is false, a UUID will be generated. | ||
* | ||
* @param remoteAddress Address extracted from an HTTP request | ||
* @param ipAsPartitionKey Whether to use the ip as a partition key or a random UUID | ||
* @return a tuple of ip (unknown if it couldn't be extracted) and partition key | ||
*/ | ||
def ipAndPartitionKey( | ||
ipAddress: Option[String], | ||
ipAsPartitionKey: Boolean | ||
): (String, String) = | ||
ipAddress match { | ||
case None => ("unknown", UUID.randomUUID.toString) | ||
case Some(ip) => (ip, if (ipAsPartitionKey) ip else UUID.randomUUID.toString) | ||
} | ||
} |
Oops, something went wrong.