-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
25 changed files
with
800 additions
and
434 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
...t/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorContainer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Apache License Version 2.0, and | ||
* you may not use this file except in compliance with the Apache License | ||
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at | ||
* http://www.apache.org/licenses/LICENSE-2.0. | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the Apache License Version 2.0 is distributed on an "AS | ||
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
* implied. See the Apache License Version 2.0 for the specific language | ||
* governing permissions and limitations there under. | ||
*/ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream.it | ||
|
||
import org.testcontainers.containers.GenericContainer | ||
|
||
case class CollectorContainer( | ||
container: GenericContainer[_], | ||
host: String, | ||
port: Int | ||
) |
24 changes: 24 additions & 0 deletions
24
...c/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/CollectorOutput.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Apache License Version 2.0, and | ||
* you may not use this file except in compliance with the Apache License | ||
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at | ||
* http://www.apache.org/licenses/LICENSE-2.0. | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the Apache License Version 2.0 is distributed on an "AS | ||
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
* implied. See the Apache License Version 2.0 for the specific language | ||
* governing permissions and limitations there under. | ||
*/ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream.it | ||
|
||
import com.snowplowanalytics.snowplow.badrows.BadRow | ||
|
||
import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload | ||
|
||
case class CollectorOutput( | ||
good: List[CollectorPayload], | ||
bad: List[BadRow] | ||
) |
62 changes: 62 additions & 0 deletions
62
...rc/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/EventGenerator.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Apache License Version 2.0, and | ||
* you may not use this file except in compliance with the Apache License | ||
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at | ||
* http://www.apache.org/licenses/LICENSE-2.0. | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the Apache License Version 2.0 is distributed on an "AS | ||
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
* implied. See the Apache License Version 2.0 for the specific language | ||
* governing permissions and limitations there under. | ||
*/ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream.it | ||
|
||
import cats.effect.IO | ||
|
||
import org.http4s.{Method, Request, Uri} | ||
|
||
object EventGenerator { | ||
|
||
def sendEvents( | ||
collectorHost: String, | ||
collectorPort: Int, | ||
nbGood: Int, | ||
nbBad: Int, | ||
maxBytes: Int | ||
): IO[Unit] = { | ||
val requests = generateEvents(collectorHost, collectorPort, nbGood, nbBad, maxBytes) | ||
Http.statuses(requests) | ||
.flatMap { responses => | ||
responses.collect { case resp if resp.code != 200 => resp.reason } match { | ||
case Nil => IO.unit | ||
case errors => IO.raiseError(new RuntimeException(s"${errors.size} requests were not successful. Example error: ${errors.head}")) | ||
} | ||
} | ||
} | ||
|
||
def generateEvents( | ||
collectorHost: String, | ||
collectorPort: Int, | ||
nbGood: Int, | ||
nbBad: Int, | ||
maxBytes: Int | ||
): List[Request[IO]] = { | ||
val good = List.fill(nbGood)(mkTp2Event(collectorHost, collectorPort, valid = true, maxBytes)) | ||
val bad = List.fill(nbBad)(mkTp2Event(collectorHost, collectorPort, valid = false, maxBytes)) | ||
good ++ bad | ||
} | ||
|
||
def mkTp2Event( | ||
collectorHost: String, | ||
collectorPort: Int, | ||
valid: Boolean = true, | ||
maxBytes: Int = 100 | ||
): Request[IO] = { | ||
val uri = Uri.unsafeFromString(s"http://$collectorHost:$collectorPort/com.snowplowanalytics.snowplow/tp2") | ||
val body = if (valid) "foo" else "a" * (maxBytes + 1) | ||
Request[IO](Method.POST, uri).withEntity(body) | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/Http.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Apache License Version 2.0, and | ||
* you may not use this file except in compliance with the Apache License | ||
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at | ||
* http://www.apache.org/licenses/LICENSE-2.0. | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the Apache License Version 2.0 is distributed on an "AS | ||
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
* implied. See the Apache License Version 2.0 for the specific language | ||
* governing permissions and limitations there under. | ||
*/ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream.it | ||
|
||
import cats.effect.{IO, Resource} | ||
import cats.implicits._ | ||
import org.http4s.blaze.client.BlazeClientBuilder | ||
import org.http4s.client.Client | ||
import org.http4s.{Request, Response, Status} | ||
|
||
object Http { | ||
|
||
def statuses(requests: List[Request[IO]]): IO[List[Status]] = | ||
mkClient.use { client => requests.traverse(client.status) } | ||
|
||
def status(request: Request[IO]): IO[Status] = | ||
mkClient.use { client => client.status(request) } | ||
|
||
def response(request: Request[IO]): IO[Response[IO]] = | ||
mkClient.use(c => c.run(request).use(resp => IO.pure(resp))) | ||
|
||
def responses(requests: List[Request[IO]]): IO[List[Response[IO]]] = | ||
mkClient.use(c => requests.traverse(r => c.run(r).use(resp => IO.pure(resp)))) | ||
|
||
def mkClient: Resource[IO, Client[IO]] = | ||
BlazeClientBuilder.apply[IO].resource | ||
} |
139 changes: 139 additions & 0 deletions
139
http4s/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/utils.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
/* | ||
* Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Apache License Version 2.0, and | ||
* you may not use this file except in compliance with the Apache License | ||
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at | ||
* http://www.apache.org/licenses/LICENSE-2.0. | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the Apache License Version 2.0 is distributed on an "AS | ||
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
* implied. See the Apache License Version 2.0 for the specific language | ||
* governing permissions and limitations there under. | ||
*/ | ||
package com.snowplowanalytics.snowplow.collectors.scalastream.it | ||
|
||
import scala.concurrent.duration._ | ||
|
||
import org.apache.thrift.TDeserializer | ||
|
||
import org.slf4j.LoggerFactory | ||
|
||
import org.testcontainers.containers.GenericContainer | ||
import org.testcontainers.containers.output.Slf4jLogConsumer | ||
|
||
import io.circe.{Json, parser} | ||
|
||
import cats.implicits._ | ||
|
||
import cats.effect.IO | ||
|
||
import retry.syntax.all._ | ||
import retry.RetryPolicies | ||
|
||
import com.snowplowanalytics.snowplow.badrows.BadRow | ||
|
||
import com.snowplowanalytics.iglu.core.SelfDescribingData | ||
import com.snowplowanalytics.iglu.core.circe.implicits._ | ||
|
||
import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload | ||
|
||
object utils { | ||
|
||
def parseCollectorPayload(bytes: Array[Byte]): CollectorPayload = { | ||
val deserializer = new TDeserializer() | ||
val target = new CollectorPayload() | ||
deserializer.deserialize(target, bytes) | ||
target | ||
} | ||
|
||
def parseBadRow(bytes: Array[Byte]): BadRow = { | ||
val str = new String(bytes) | ||
val parsed = for { | ||
json <- parser.parse(str).leftMap(_.message) | ||
sdj <- SelfDescribingData.parse(json).leftMap(_.message("Can't decode JSON as SDJ")) | ||
br <- sdj.data.as[BadRow].leftMap(_.getMessage()) | ||
} yield br | ||
parsed match { | ||
case Right(br) => br | ||
case Left(err) => throw new RuntimeException(s"Can't parse bad row. Error: $err") | ||
} | ||
} | ||
|
||
def printBadRows(testName: String, badRows: List[BadRow]): IO[Unit] = { | ||
log(testName, "Bad rows:") *> | ||
badRows.traverse_(br => log(testName, br.compact)) | ||
} | ||
|
||
def log(testName: String, line: String): IO[Unit] = | ||
IO(println(s"[$testName] $line")) | ||
|
||
def startContainerWithLogs( | ||
container: GenericContainer[_], | ||
loggerName: String | ||
): GenericContainer[_] = { | ||
container.start() | ||
val logger = LoggerFactory.getLogger(loggerName) | ||
val logs = new Slf4jLogConsumer(logger) | ||
container.followOutput(logs) | ||
container | ||
} | ||
|
||
def waitWhile[A]( | ||
a: A, | ||
condition: A => Boolean, | ||
maxDelay: FiniteDuration | ||
): IO[Boolean] = { | ||
val retryPolicy = RetryPolicies.limitRetriesByCumulativeDelay( | ||
maxDelay, | ||
RetryPolicies.capDelay[IO]( | ||
2.second, | ||
RetryPolicies.fullJitter[IO](1.second) | ||
) | ||
) | ||
|
||
IO(condition(a)).retryingOnFailures( | ||
result => IO(!result), | ||
retryPolicy, | ||
(_, _) => IO.unit | ||
) | ||
} | ||
|
||
/** Return a list of config parameters from a raw JSON string. */ | ||
def getConfigParameters(config: String): List[String] = { | ||
val parsed: Json = parser.parse(config).valueOr { case failure => | ||
throw new IllegalArgumentException("Can't parse JSON", failure.underlying) | ||
} | ||
|
||
def flatten(value: Json): Option[List[(String, Json)]] = | ||
value.asObject.map( | ||
_.toList.flatMap { | ||
case (k, v) => flatten(v) match { | ||
case None => List(k -> v) | ||
case Some(fields) => fields.map { | ||
case (innerK, innerV) => s"$k.$innerK" -> innerV | ||
} | ||
} | ||
} | ||
) | ||
|
||
def withSpaces(s: String): String = if(s.contains(" ")) s""""$s"""" else s | ||
|
||
val fields = flatten(parsed).getOrElse(throw new IllegalArgumentException("Couldn't flatten fields")) | ||
|
||
fields.flatMap { | ||
case (k, v) if v.isString => | ||
List(s"-D$k=${withSpaces(v.asString.get)}") | ||
case (k, v) if v.isArray => | ||
v.asArray.get.toList.zipWithIndex.map { | ||
case (s, i) if s.isString => | ||
s"-D$k.$i=${withSpaces(s.asString.get)}" | ||
case (other, i) => | ||
s"-D$k.$i=${withSpaces(other.toString)}" | ||
} | ||
case (k, v) => | ||
List(s"-D$k=${withSpaces(v.toString)}") | ||
} | ||
} | ||
} |
Oops, something went wrong.