Skip to content

Commit

Permalink
Hostname telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 2, 2023
1 parent eee1035 commit 2cfce45
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 70 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ lazy val http4s = project
Dependencies.Libraries.emitterHttps,
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,
Dependencies.Libraries.ceTestkit,

//Integration tests
Dependencies.Libraries.IT.testcontainers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ object Run {
config: Config[SinkConfig]
): F[ExitCode] = {
val resources = for {
sinks <- mkSinks(config.streams)
sinks <- mkSinks(config.streams)
hostnameSet <- Resource.eval(Telemetry.HostnameSet.init(config.telemetry))
collectorService = new Service[F](
config,
Sinks(sinks.good, sinks.bad),
appInfo
appInfo,
hostnameSet
)
httpServer = HttpServer.build[F](
new Routes[F](config.enableDefaultRedirect, collectorService).value,
Expand All @@ -78,20 +80,24 @@ object Run {
)
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
httpClient <- BlazeClientBuilder[F].resource
} yield httpClient

resources.use { httpClient =>
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
telemetryInfo(config).region,
telemetryInfo(config).cloud
)
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
} yield (httpClient, hostnameSet)

resources.use {
case (httpClient, hostnameSet) =>
val appId = java.util.UUID.randomUUID.toString
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
appId,
telemetryInfo(config).region,
telemetryInfo(config).cloud,
hostnameSet
)
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import java.util.UUID

import org.apache.commons.codec.binary.Base64

import com.comcast.ip4s.Dns

import scala.concurrent.duration._
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -47,9 +49,12 @@ object Service {
class Service[F[_]: Sync](
config: Config[Any],
sinks: Sinks[F],
appInfo: AppInfo
appInfo: AppInfo,
hostnameSet: Telemetry.HostnameSet[F]
) extends IService[F] {

implicit val dns: Dns[F] = Dns.forSync[F]

val pixelStream = Stream.iterable[F, Byte](Service.pixel)

private val collector = s"${appInfo.name}:${appInfo.version}"
Expand All @@ -67,7 +72,6 @@ class Service[F[_]: Sync](
for {
body <- body
redirect = path.startsWith("/r/")
hostname = extractHostname(request)
userAgent = extractHeader(request, "User-Agent")
refererUri = extractHeader(request, "Referer")
spAnonymous = extractHeader(request, "SP-Anonymous")
Expand All @@ -77,6 +81,7 @@ class Service[F[_]: Sync](
nuidOpt = networkUserId(request, cookie, spAnonymous)
nuid = nuidOpt.getOrElse(UUID.randomUUID().toString)
(ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey)
hostname <- extractHostname(request)
event = buildEvent(
queryString,
body,
Expand Down Expand Up @@ -105,6 +110,7 @@ class Service[F[_]: Sync](
`Access-Control-Allow-Credentials`().toRaw1.some
).flatten
responseHeaders = Headers(headerList)
_ <- hostname.map(hostnameSet.add).getOrElse(Sync[F].unit)
_ <- sinkEvent(event, partitionKey)
resp = buildHttpResponse(
queryParams = request.uri.query.params,
Expand Down Expand Up @@ -138,8 +144,8 @@ class Service[F[_]: Sync](
def extractCookie(req: Request[F]): Option[RequestCookie] =
req.cookies.find(_.name == config.cookie.name)

def extractHostname(req: Request[F]): Option[String] =
req.uri.authority.map(_.host.renderString) // Hostname is extracted like this in Akka-Http as well
def extractHostname(req: Request[F]): F[Option[String]] =
req.remoteHost.map(_.map(_.toString))

def extractIp(req: Request[F], spAnonymous: Option[String]): Option[String] =
spAnonymous match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package com.snowplowanalytics.snowplow.collector.core
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import org.apache.commons.codec.digest.DigestUtils

import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.{Async, Resource, Sync}
import cats.effect.std.Random
import cats.effect.kernel.Ref

import fs2.Stream

Expand All @@ -32,22 +35,30 @@ object Telemetry {
telemetryConfig: Config.Telemetry,
httpClient: HttpClient[F],
appInfo: AppInfo,
appGeneratedId: String,
region: Option[String],
cloud: Option[String]
cloud: Option[String],
hostnameSet: HostnameSet[F]
): Stream[F, Unit] =
if (telemetryConfig.disable)
Stream.empty.covary[F]
else {
val sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version
)
Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker =>
Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters()
for {
hostnames <- hostnameSet.getHashed
sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version,
appGeneratedId,
hostnames
)
_ <- tracker.trackSelfDescribingEvent(unstructEvent = sdj)
_ <- tracker.flushEmitters()
} yield ()
}
}
}
Expand Down Expand Up @@ -93,26 +104,60 @@ object Telemetry {
region: Option[String],
cloud: Option[String],
appName: String,
appVersion: String
appVersion: String,
appGeneratedId: String,
hashedHostnames: Set[String]
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)),
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)),
Json.obj(
"userProvidedId" -> teleCfg.userProvidedId.asJson,
"autoGeneratedId" -> teleCfg.autoGeneratedId.asJson,
"moduleName" -> teleCfg.moduleName.asJson,
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"appGeneratedId" -> appGeneratedId.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
"applicationVersion" -> appVersion.asJson,
"hashedHostnames" -> hashedHostnames.asJson
)
)

case class TelemetryInfo(
region: Option[String],
cloud: Option[String]
)

trait HostnameSet[F[_]] {
def add(hostname: String): F[Unit]
def getHashed: F[Set[String]]
}

object HostnameSet {
private def create[F[_]: Sync]: F[HostnameSet[F]] =
Ref
.of[F, Set[String]](Set.empty)
.map(ref =>
new HostnameSet[F] {
override def add(hostname: String): F[Unit] =
ref.update(_ + hostname)

override def getHashed: F[Set[String]] =
ref.get.map(s => s.map(DigestUtils.sha256Hex))
}
)

private def createNoop[F[_]: Sync]: F[HostnameSet[F]] =
Sync[F].pure {
new HostnameSet[F] {
override def add(hostname: String): F[Unit] = Sync[F].unit
override def getHashed: F[Set[String]] = Sync[F].pure(Set.empty)
}
}

def init[F[_]: Sync](telemetryConfig: Config.Telemetry): F[HostnameSet[F]] =
if (telemetryConfig.disable) createNoop[F] else create[F]
}
}
Loading

0 comments on commit 2cfce45

Please sign in to comment.