From 2cfce4554850610ea697b476ef5c7d9f302735b7 Mon Sep 17 00:00:00 2001 From: spenes Date: Fri, 29 Sep 2023 12:17:32 +0300 Subject: [PATCH] Hostname telemetry --- build.sbt | 1 + .../Run.scala | 38 ++-- .../Service.scala | 14 +- .../Telemetry.scala | 71 ++++++-- .../ServiceSpec.scala | 132 ++++++++++---- .../TelemetrySpec.scala | 164 ++++++++++++++++++ project/Dependencies.scala | 6 +- 7 files changed, 356 insertions(+), 70 deletions(-) create mode 100644 http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala diff --git a/build.sbt b/build.sbt index 177c5ad7b..0d9594eac 100644 --- a/build.sbt +++ b/build.sbt @@ -147,6 +147,7 @@ lazy val http4s = project Dependencies.Libraries.emitterHttps, Dependencies.Libraries.specs2, Dependencies.Libraries.specs2CE, + Dependencies.Libraries.ceTestkit, //Integration tests Dependencies.Libraries.IT.testcontainers, diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 25297d818..796771259 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -64,11 +64,13 @@ object Run { config: Config[SinkConfig] ): F[ExitCode] = { val resources = for { - sinks <- mkSinks(config.streams) + sinks <- mkSinks(config.streams) + hostnameSet <- Resource.eval(Telemetry.HostnameSet.init(config.telemetry)) collectorService = new Service[F]( config, Sinks(sinks.good, sinks.bad), - appInfo + appInfo, + hostnameSet ) httpServer = HttpServer.build[F]( new Routes[F](config.enableDefaultRedirect, collectorService).value, @@ -78,20 +80,24 @@ object Run { ) _ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer) httpClient <- BlazeClientBuilder[F].resource - } yield httpClient - - resources.use { httpClient => - Telemetry - .run( - config.telemetry, - httpClient, - appInfo, - telemetryInfo(config).region, - telemetryInfo(config).cloud - ) - .compile - .drain - .flatMap(_ => Async[F].never[ExitCode]) + } yield (httpClient, hostnameSet) + + resources.use { + case (httpClient, hostnameSet) => + val appId = java.util.UUID.randomUUID.toString + Telemetry + .run( + config.telemetry, + httpClient, + appInfo, + appId, + telemetryInfo(config).region, + telemetryInfo(config).cloud, + hostnameSet + ) + .compile + .drain + .flatMap(_ => Async[F].never[ExitCode]) } } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala index ad3373cea..7ffad70b9 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Service.scala @@ -4,6 +4,8 @@ import java.util.UUID import org.apache.commons.codec.binary.Base64 +import com.comcast.ip4s.Dns + import scala.concurrent.duration._ import scala.collection.JavaConverters._ @@ -47,9 +49,12 @@ object Service { class Service[F[_]: Sync]( config: Config[Any], sinks: Sinks[F], - appInfo: AppInfo + appInfo: AppInfo, + hostnameSet: Telemetry.HostnameSet[F] ) extends IService[F] { + implicit val dns: Dns[F] = Dns.forSync[F] + val pixelStream = Stream.iterable[F, Byte](Service.pixel) private val collector = s"${appInfo.name}:${appInfo.version}" @@ -67,7 +72,6 @@ class Service[F[_]: Sync]( for { body <- body redirect = path.startsWith("/r/") - hostname = extractHostname(request) userAgent = extractHeader(request, "User-Agent") refererUri = extractHeader(request, "Referer") spAnonymous = extractHeader(request, "SP-Anonymous") @@ -77,6 +81,7 @@ class Service[F[_]: Sync]( nuidOpt = networkUserId(request, cookie, spAnonymous) nuid = nuidOpt.getOrElse(UUID.randomUUID().toString) (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey) + hostname <- extractHostname(request) event = buildEvent( queryString, body, @@ -105,6 +110,7 @@ class Service[F[_]: Sync]( `Access-Control-Allow-Credentials`().toRaw1.some ).flatten responseHeaders = Headers(headerList) + _ <- hostname.map(hostnameSet.add).getOrElse(Sync[F].unit) _ <- sinkEvent(event, partitionKey) resp = buildHttpResponse( queryParams = request.uri.query.params, @@ -138,8 +144,8 @@ class Service[F[_]: Sync]( def extractCookie(req: Request[F]): Option[RequestCookie] = req.cookies.find(_.name == config.cookie.name) - def extractHostname(req: Request[F]): Option[String] = - req.uri.authority.map(_.host.renderString) // Hostname is extracted like this in Akka-Http as well + def extractHostname(req: Request[F]): F[Option[String]] = + req.remoteHost.map(_.map(_.toString)) def extractIp(req: Request[F], spAnonymous: Option[String]): Option[String] = spAnonymous match { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala index 95df9bebc..0cf681467 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala @@ -3,11 +3,14 @@ package com.snowplowanalytics.snowplow.collector.core import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import org.apache.commons.codec.digest.DigestUtils + import cats.data.NonEmptyList import cats.implicits._ import cats.effect.{Async, Resource, Sync} import cats.effect.std.Random +import cats.effect.kernel.Ref import fs2.Stream @@ -32,22 +35,30 @@ object Telemetry { telemetryConfig: Config.Telemetry, httpClient: HttpClient[F], appInfo: AppInfo, + appGeneratedId: String, region: Option[String], - cloud: Option[String] + cloud: Option[String], + hostnameSet: HostnameSet[F] ): Stream[F, Unit] = if (telemetryConfig.disable) Stream.empty.covary[F] else { - val sdj = makeHeartbeatEvent( - telemetryConfig, - region, - cloud, - appInfo.moduleName, - appInfo.version - ) Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker => Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ => - tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters() + for { + hostnames <- hostnameSet.getHashed + sdj = makeHeartbeatEvent( + telemetryConfig, + region, + cloud, + appInfo.moduleName, + appInfo.version, + appGeneratedId, + hostnames + ) + _ <- tracker.trackSelfDescribingEvent(unstructEvent = sdj) + _ <- tracker.flushEmitters() + } yield () } } } @@ -93,21 +104,24 @@ object Telemetry { region: Option[String], cloud: Option[String], appName: String, - appVersion: String + appVersion: String, + appGeneratedId: String, + hashedHostnames: Set[String] ): SelfDescribingData[Json] = SelfDescribingData( - SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)), + SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)), Json.obj( "userProvidedId" -> teleCfg.userProvidedId.asJson, "autoGeneratedId" -> teleCfg.autoGeneratedId.asJson, "moduleName" -> teleCfg.moduleName.asJson, "moduleVersion" -> teleCfg.moduleVersion.asJson, "instanceId" -> teleCfg.instanceId.asJson, - "appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson, + "appGeneratedId" -> appGeneratedId.asJson, "cloud" -> cloud.asJson, "region" -> region.asJson, "applicationName" -> appName.asJson, - "applicationVersion" -> appVersion.asJson + "applicationVersion" -> appVersion.asJson, + "hashedHostnames" -> hashedHostnames.asJson ) ) @@ -115,4 +129,35 @@ object Telemetry { region: Option[String], cloud: Option[String] ) + + trait HostnameSet[F[_]] { + def add(hostname: String): F[Unit] + def getHashed: F[Set[String]] + } + + object HostnameSet { + private def create[F[_]: Sync]: F[HostnameSet[F]] = + Ref + .of[F, Set[String]](Set.empty) + .map(ref => + new HostnameSet[F] { + override def add(hostname: String): F[Unit] = + ref.update(_ + hostname) + + override def getHashed: F[Set[String]] = + ref.get.map(s => s.map(DigestUtils.sha256Hex)) + } + ) + + private def createNoop[F[_]: Sync]: F[HostnameSet[F]] = + Sync[F].pure { + new HostnameSet[F] { + override def add(hostname: String): F[Unit] = Sync[F].unit + override def getHashed: F[Set[String]] = Sync[F].pure(Set.empty) + } + } + + def init[F[_]: Sync](telemetryConfig: Config.Telemetry): F[HostnameSet[F]] = + if (telemetryConfig.disable) createNoop[F] else create[F] + } } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala index f44bfba02..f51595b5e 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ServiceSpec.scala @@ -2,6 +2,7 @@ package com.snowplowanalytics.snowplow.collector.core import scala.concurrent.duration._ import scala.collection.JavaConverters._ +import scala.collection.mutable import org.specs2.mutable.Specification @@ -25,12 +26,19 @@ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPa import com.snowplowanalytics.snowplow.collector.core.model._ class ServiceSpec extends Specification { - case class ProbeService(service: Service[IO], good: TestSink, bad: TestSink) + case class ProbeService(service: Service[IO], good: TestSink, bad: TestSink, hostnameSet: Telemetry.HostnameSet[IO]) + + def telemetryHostnameSet = new Telemetry.HostnameSet[IO] { + val testHostnameSet = mutable.Set.empty[String] + override def add(hostname: String): IO[Unit] = IO(testHostnameSet.add(hostname)).void + override def getHashed: IO[Set[String]] = IO(testHostnameSet.toSet) + } val service = new Service( - config = TestUtils.testConfig, - sinks = Sinks(new TestSink, new TestSink), - appInfo = TestUtils.appInfo + config = TestUtils.testConfig, + sinks = Sinks(new TestSink, new TestSink), + appInfo = TestUtils.appInfo, + hostnameSet = telemetryHostnameSet ) val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r @@ -43,20 +51,22 @@ class ServiceSpec extends Specification { `Access-Control-Allow-Credentials`() ) val testConnection = Request.Connection( - local = SocketAddress.fromStringIp("192.0.2.1:80").get, - remote = SocketAddress.fromStringIp("192.0.2.2:80").get, + local = SocketAddress.fromStringIp("127.0.0.1:80").get, + remote = SocketAddress.fromStringIp("127.0.0.1:80").get, secure = false ) def probeService(config: Config[Any] = TestUtils.testConfig): ProbeService = { - val good = new TestSink - val bad = new TestSink + val good = new TestSink + val bad = new TestSink + val hostnameSet: Telemetry.HostnameSet[IO] = telemetryHostnameSet val service = new Service( - config = config, - sinks = Sinks(good, bad), - appInfo = TestUtils.appInfo + config = config, + sinks = Sinks(good, bad), + appInfo = TestUtils.appInfo, + hostnameSet = hostnameSet ) - ProbeService(service, good, bad) + ProbeService(service, good, bad, hostnameSet) } def emptyCollectorPayload: CollectorPayload = @@ -86,8 +96,8 @@ class ServiceSpec extends Specification { r.headers.get(ci"Set-Cookie") must beNone } "not set a network_userid from cookie if SP-Anonymous is present" in { - val ProbeService(service, good, bad) = probeService() - val nuid = "test-nuid" + val ProbeService(service, good, bad, _) = probeService() + val nuid = "test-nuid" val req = Request[IO]( method = Method.POST, headers = Headers( @@ -113,8 +123,8 @@ class ServiceSpec extends Specification { e.networkUserId shouldEqual "00000000-0000-0000-0000-000000000000" } "network_userid from cookie should persist if SP-Anonymous is not present" in { - val ProbeService(service, good, bad) = probeService() - val nuid = "test-nuid" + val ProbeService(service, good, bad, _) = probeService() + val nuid = "test-nuid" val req = Request[IO]( method = Method.POST ).addCookie(TestUtils.testConfig.cookie.name, nuid) @@ -137,7 +147,7 @@ class ServiceSpec extends Specification { e.networkUserId shouldEqual "test-nuid" } "use the ip address from 'X-Forwarded-For' header if it exists" in { - val ProbeService(service, good, bad) = probeService() + val ProbeService(service, good, bad, _) = probeService() val req = Request[IO]( method = Method.POST, headers = Headers( @@ -163,7 +173,7 @@ class ServiceSpec extends Specification { e.ipAddress shouldEqual "192.0.2.4" } "use the ip address from remote address if 'X-Forwarded-For' header doesn't exist" in { - val ProbeService(service, good, bad) = probeService() + val ProbeService(service, good, bad, _) = probeService() val req = Request[IO]( method = Method.POST ).withAttribute(Request.Keys.ConnectionInfo, testConnection) @@ -183,10 +193,10 @@ class ServiceSpec extends Specification { bad.storedRawEvents must have size 0 val e = emptyCollectorPayload deserializer.deserialize(e, good.storedRawEvents.head) - e.ipAddress shouldEqual "192.0.2.2" + e.ipAddress shouldEqual "127.0.0.1" } "set the ip address to 'unknown' if if SP-Anonymous is present" in { - val ProbeService(service, good, bad) = probeService() + val ProbeService(service, good, bad, _) = probeService() val req = Request[IO]( method = Method.POST, headers = Headers( @@ -212,14 +222,13 @@ class ServiceSpec extends Specification { e.ipAddress shouldEqual "unknown" } "respond with a 200 OK and a good row in good sink" in { - val ProbeService(service, good, bad) = probeService() - val nuid = "dfdb716e-ecf9-4d00-8b10-44edfbc8a108" + val ProbeService(service, good, bad, _) = probeService() + val nuid = "dfdb716e-ecf9-4d00-8b10-44edfbc8a108" val req = Request[IO]( method = Method.POST, headers = testHeaders, uri = Uri( - query = Query.unsafeFromString("a=b"), - authority = Some(Uri.Authority(host = Uri.RegName("example.com"))) + query = Query.unsafeFromString("a=b") ) ).withAttribute(Request.Keys.ConnectionInfo, testConnection).addCookie(TestUtils.testConfig.cookie.name, nuid) val r = service @@ -248,7 +257,7 @@ class ServiceSpec extends Specification { e.path shouldEqual "p" e.userAgent shouldEqual "testUserAgent" e.refererUri shouldEqual "example.com" - e.hostname shouldEqual "example.com" + e.hostname shouldEqual "localhost" e.networkUserId shouldEqual nuid e.headers shouldEqual List( "User-Agent: testUserAgent", @@ -263,7 +272,7 @@ class ServiceSpec extends Specification { } "sink event with headers removed when spAnonymous set" in { - val ProbeService(service, good, bad) = probeService() + val ProbeService(service, good, bad, _) = probeService() val req = Request[IO]( method = Method.POST, @@ -372,8 +381,8 @@ class ServiceSpec extends Specification { .copy( redirectDomains = Set("snowplow.acme.com", "example.com") ) - val testPath = "/r/example?u=https://snowplow.acme.com/12" - val ProbeService(service, good, bad) = probeService(config = testConf) + val testPath = "/r/example?u=https://snowplow.acme.com/12" + val ProbeService(service, good, bad, _) = probeService(config = testConf) val req = Request[IO]( method = Method.GET, uri = Uri.unsafeFromString(testPath) @@ -394,6 +403,58 @@ class ServiceSpec extends Specification { good.storedRawEvents must have size 1 bad.storedRawEvents must have size 0 } + + "add hostname to telemetry hostname set" in { + val ProbeService(service, good, bad, hostnameSet) = probeService() + val req = Request[IO]( + method = Method.POST + ).withAttribute(Request.Keys.ConnectionInfo, testConnection) + val r = service + .cookie( + body = IO.pure(Some("b")), + path = "p", + request = req, + pixelExpected = false, + doNotTrack = false, + contentType = Some("image/gif") + ) + .unsafeRunSync() + + r.status mustEqual Status.Ok + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 + + hostnameSet.getHashed.unsafeRunSync() must beEqualTo(Set("localhost")) + + val e = emptyCollectorPayload + deserializer.deserialize(e, good.storedRawEvents.head) + e.hostname shouldEqual "localhost" + } + + "don't add anything to telemetry hostname set if request doesn't have hostname" in { + val ProbeService(service, good, bad, hostnameSet) = probeService() + val req = Request[IO](method = Method.POST) + val r = service + .cookie( + body = IO.pure(Some("b")), + path = "p", + request = req, + pixelExpected = false, + doNotTrack = false, + contentType = Some("image/gif") + ) + .unsafeRunSync() + + r.status mustEqual Status.Ok + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 + + hostnameSet.getHashed.unsafeRunSync() must beEqualTo(Set.empty) + + val e = emptyCollectorPayload + deserializer.deserialize(e, good.storedRawEvents.head) + e.hostname shouldEqual null + } } "preflightResponse" in { @@ -471,7 +532,7 @@ class ServiceSpec extends Specification { "sinkEvent" in { "send back the produced events" in { - val ProbeService(s, good, bad) = probeService() + val ProbeService(s, good, bad, _) = probeService() s.sinkEvent(event, "key").unsafeRunSync() good.storedRawEvents must have size 1 bad.storedRawEvents must have size 0 @@ -486,7 +547,7 @@ class ServiceSpec extends Specification { .copy( redirectDomains = Set("example1.com", "example2.com") ) - val ProbeService(service, _, _) = probeService(config = testConfig) + val ProbeService(service, _, _, _) = probeService(config = testConfig) val res = service.buildHttpResponse( queryParams = Map("u" -> "https://example1.com/12"), headers = testHeaders, @@ -548,7 +609,7 @@ class ServiceSpec extends Specification { .copy( redirectDomains = Set("example1.com", "example2.com") ) - val ProbeService(service, _, _) = probeService(config = testConfig) + val ProbeService(service, _, _, _) = probeService(config = testConfig) val res = service.buildRedirectHttpResponse( queryParams = Map("u" -> "https://example1.com/12"), headers = testHeaders @@ -562,7 +623,7 @@ class ServiceSpec extends Specification { .copy( redirectDomains = Set("example1.com", "example2.com") ) - val ProbeService(service, _, _) = probeService(config = testConfig) + val ProbeService(service, _, _, _) = probeService(config = testConfig) val res = service.buildRedirectHttpResponse( queryParams = Map.empty, headers = testHeaders @@ -576,7 +637,7 @@ class ServiceSpec extends Specification { .copy( redirectDomains = Set("example1.com", "example2.com") ) - val ProbeService(service, _, _) = probeService(config = testConfig) + val ProbeService(service, _, _, _) = probeService(config = testConfig) val res = service.buildRedirectHttpResponse( queryParams = Map("u" -> "https://invalidexample1.com/12"), headers = testHeaders @@ -590,7 +651,7 @@ class ServiceSpec extends Specification { .copy( redirectDomains = Set.empty ) - val ProbeService(service, _, _) = probeService(config = testConfig) + val ProbeService(service, _, _, _) = probeService(config = testConfig) val res = service.buildRedirectHttpResponse( queryParams = Map("u" -> "https://unknown.example.com/12"), headers = testHeaders @@ -1005,7 +1066,8 @@ class ServiceSpec extends Specification { val service = new Service( TestUtils.testConfig.copy(paths = Map.empty[String, String]), Sinks(new TestSink, new TestSink), - TestUtils.appInfo + TestUtils.appInfo, + telemetryHostnameSet ) val expected1 = "/com.acme/track" val expected2 = "/com.acme/redirect" diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala new file mode 100644 index 000000000..a9c047a41 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala @@ -0,0 +1,164 @@ +package com.snowplowanalytics.snowplow.collector.core + +import scala.concurrent.duration._ +import scala.collection.mutable.ListBuffer + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.codec.digest.DigestUtils + +import java.nio.charset.StandardCharsets + +import cats.implicits._ + +import cats.effect._ +import cats.effect.unsafe.implicits.global +import cats.effect.testkit.TestControl + +import org.http4s._ +import org.http4s.client.{Client => HttpClient} + +import io.circe._ +import io.circe.parser._ +import io.circe.syntax._ + +import fs2.Stream + +import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking + +import org.specs2.mutable.Specification + +class TelemetrySpec extends Specification { + + case class ProbeTelemetry( + telemetryStream: Stream[IO, Unit], + telemetryEvents: ListBuffer[Json], + hostnameSet: Telemetry.HostnameSet[IO] + ) + + val appId = "testAppId" + val region = Some("testRegion") + val cloud = Some("testCloud") + val interval = 5.minutes + val telemetryConfig = Config.Telemetry( + disable = false, + interval = interval, + method = "POST", + url = "127.0.0.1", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None + ) + + def probeTelemetry(telemetryConfig: Config.Telemetry): ProbeTelemetry = { + val telemetryEvents = ListBuffer[Json]() + val httpApp = HttpRoutes + .of[IO] { + case req => + IO { + telemetryEvents += extractTelemetryEvent(req) + Response[IO](status = Status.Ok) + } + } + .orNotFound + val client = HttpClient.fromHttpApp[IO](httpApp) + val hostnameSet = Telemetry.HostnameSet.init[IO](telemetryConfig).unsafeRunSync() + val telemetryStream = Telemetry.run[IO]( + telemetryConfig, + client, + TestUtils.appInfo, + appId, + region, + cloud, + hostnameSet + ) + ProbeTelemetry(telemetryStream, telemetryEvents, hostnameSet) + } + + def extractTelemetryEvent(req: Request[IO]): Json = { + val body = req.bodyText.compile.string.unsafeRunSync() + val jsonBody = parse(body).toOption.get + val uepxEncoded = jsonBody.hcursor.downField("data").downN(0).downField("ue_px").as[String].toOption.get + val uePxDecoded = new String(Base64.decodeBase64(uepxEncoded), StandardCharsets.UTF_8) + parse(uePxDecoded).toOption.get.hcursor.downField("data").as[Json].toOption.get + } + + def expectedEvent( + config: Config.Telemetry, + hostnameCount: Int + ): Json = { + val hashedHostnames = (1 to hostnameCount).map(i => DigestUtils.sha256Hex(i.toString)).toSet + Json.obj( + "schema" -> "iglu:com.snowplowanalytics.oss/oss_context/jsonschema/1-0-2".asJson, + "data" -> Json.obj( + "userProvidedId" -> config.userProvidedId.asJson, + "autoGeneratedId" -> config.autoGeneratedId.asJson, + "moduleName" -> config.moduleName.asJson, + "moduleVersion" -> config.moduleVersion.asJson, + "instanceId" -> config.instanceId.asJson, + "appGeneratedId" -> appId.asJson, + "cloud" -> cloud.asJson, + "region" -> region.asJson, + "applicationName" -> TestUtils.appInfo.name.asJson, + "applicationVersion" -> TestUtils.appInfo.version.asJson, + "hashedHostnames" -> hashedHostnames.asJson + ) + ) + } + + "Telemetry" should { + "send correct number of events with expected hostnames" in { + val eventCount = 10 + val timeout = (interval * eventCount.toLong) + 1.minutes + val probe = probeTelemetry(telemetryConfig) + TestControl + .execute( + probe.telemetryStream.timeout(timeout).compile.drain.voidError + ) + .flatMap { tc => + tc.tick >> + (1 to eventCount) + .map { i => + probe.hostnameSet.add(i.toString) >> tc.advanceAndTick(interval) + } + .toList + .sequence_ + } + .unsafeRunSync() + val events = probe.telemetryEvents + val expected = (1 to eventCount).map(i => expectedEvent(telemetryConfig, i)).toList + events must beEqualTo(expected) + } + + "not send any events if telemetry is disabled" in { + val probe = probeTelemetry(telemetryConfig.copy(disable = true)) + TestControl + .executeEmbed( + probe.telemetryStream.timeout(interval * 10).compile.drain.voidError + ) + .unsafeRunSync() + probe.telemetryEvents must beEmpty + } + } + + "HostnameSet" should { + "return hashed versions of stored hostnames" in { + val count = 10 + val hostnameSet = Telemetry.HostnameSet.init[IO](telemetryConfig).unsafeRunSync() + (1 to count).foreach(i => hostnameSet.add(s"hostname-$i").unsafeRunSync()) + val expected = (1 to count).map(i => DigestUtils.sha256Hex(s"hostname-$i")).toSet + val result = hostnameSet.getHashed.unsafeRunSync() + result must beEqualTo(expected) + } + + "not store any hostname if telemetry is disabled" in { + val count = 10 + val hostnameSet = Telemetry.HostnameSet.init[IO](telemetryConfig.copy(disable = true)).unsafeRunSync() + (1 to count).foreach(i => hostnameSet.add(s"hostname-$i").unsafeRunSync()) + hostnameSet.getHashed.unsafeRunSync() must beEmpty + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fd7df112e..b03e73514 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -62,6 +62,7 @@ object Dependencies { val specs2 = "4.11.0" val specs2CE = "1.5.0" val testcontainers = "0.40.10" + val ceTestkit = "3.4.5" object Legacy { val specs2CE = "0.4.1" @@ -120,8 +121,9 @@ object Dependencies { // Scala (test only) // Test common - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test - val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test + val ceTestkit = "org.typelevel" %% "cats-effect-testkit" % V.ceTestkit % Test // Test Akka val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test