Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/3.2.0 #257

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 3.2.0 (2024-08-20)
--------------------------
Add a function that discovers and looks up schemas without using the list endpoint of an Iglu server (#256)
maxJsonDepth when validating JSON (#254)

Version 3.1.1 (2024-07-03)
--------------------------
Fix listSchemasLike when multiple Iglu repositories host the schemas (#252)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ Iglu Scala Client is used extensively in **[Snowplow][snowplow-repo]** to valida

## Installation

The latest version of Iglu Scala Client is 3.1.1, which works with Scala 2.12, 2.13, and 3.
The latest version of Iglu Scala Client is 3.2.0, which works with Scala 2.12, 2.13, and 3.

If you're using SBT, add the following lines to your build file:

```scala
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % "3.1.1"
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % "3.2.0"
```

## API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ import io.circe.{DecodingFailure, Json}
*/
final class IgluCirceClient[F[_]] private (
resolver: Resolver[F],
schemaEvaluationCache: SchemaEvaluationCache[F]
schemaEvaluationCache: SchemaEvaluationCache[F],
maxJsonDepth: Int
) {
@deprecated("Use `IgluCirceClient(resolver, cache, maxJsonDepth)`", "3.2.0")
def this(resolver: Resolver[F], cache: SchemaEvaluationCache[F]) =
this(resolver, cache, Int.MaxValue)

def check(
instance: SelfDescribingData[Json]
)(implicit
Expand All @@ -50,7 +55,8 @@ final class IgluCirceClient[F[_]] private (
resolver.lookupSchemaResult(instance.schema, resolveSupersedingSchema = true)
)
validation =
CirceValidator.WithCaching.validate(schemaEvaluationCache)(instance.data, resolverResult)
CirceValidator.WithCaching
.validate(schemaEvaluationCache)(instance.data, resolverResult, maxJsonDepth)
_ <- EitherT(validation).leftMap(e =>
e.toClientError(resolverResult.value.supersededBy.map(_.asString))
)
Expand All @@ -61,21 +67,36 @@ final class IgluCirceClient[F[_]] private (

object IgluCirceClient {

@deprecated("Use `parseDefault(json, maxJsonDepth)`", "3.2.0")
def parseDefault[F[_]: Monad: CreateResolverCache: InitValidatorCache](
json: Json
): EitherT[F, DecodingFailure, IgluCirceClient[F]] =
parseDefault(json, Int.MaxValue)

def parseDefault[F[_]: Monad: CreateResolverCache: InitValidatorCache](
json: Json,
maxJsonDepth: Int
): EitherT[F, DecodingFailure, IgluCirceClient[F]] =
for {
config <- EitherT.fromEither[F](Resolver.parseConfig(json))
resolver <- Resolver.fromConfig[F](config)
client <- EitherT.liftF(fromResolver(resolver, config.cacheSize))
client <- EitherT.liftF(fromResolver(resolver, config.cacheSize, maxJsonDepth))
} yield client

@deprecated("Use `fromResolver(resolver, cacheSize, maxJsonDepth)`", "3.2.0")
def fromResolver[F[_]: Monad: InitValidatorCache](
resolver: Resolver[F],
cacheSize: Int
): F[IgluCirceClient[F]] =
fromResolver(resolver, cacheSize, Int.MaxValue)

def fromResolver[F[_]: Monad: InitValidatorCache](
resolver: Resolver[F],
cacheSize: Int,
maxJsonDepth: Int
): F[IgluCirceClient[F]] = {
schemaEvaluationCache[F](cacheSize).map { cache =>
new IgluCirceClient(resolver, cache)
new IgluCirceClient(resolver, cache, maxJsonDepth)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
private[client] val allRepos: NonEmptyList[Registry] =
NonEmptyList[Registry](Registry.EmbeddedRegistry, repos)

private val allIgluCentral: Set[String] = repos.collect {
case Registry.Http(config, connection)
if connection.uri.getHost.matches(""".*\biglucentral\b.*""") =>
config.name
}.toSet

/**
* Tries to find the given schema in any of the provided repository refs
* If any of repositories gives non-non-found error, lookup will retried
Expand Down Expand Up @@ -181,6 +187,71 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
): F[Either[ResolutionError, Json]] =
lookupSchemaResult(schemaKey).map(_.map(_.value.schema))

/**
* If Iglu Central or any of its mirrors doesn't have a schema,
* it should be considered NotFound, even if one of them returned an error.
*/
private[resolver] def isNotFound(error: ResolutionError): Boolean = {
val (igluCentral, custom) = error.value.partition { case (repo, _) =>
allIgluCentral.contains(repo)
}
(igluCentral.isEmpty || igluCentral.values.exists(
_.errors.exists(_ == RegistryError.NotFound)
)) && custom.values.flatMap(_.errors).forall(_ == RegistryError.NotFound)
}

/**
* Looks up all the schemas with the same model until `maxSchemaKey`.
* For the schemas of previous revisions, it starts with addition = 0
* and increments it until a NotFound.
*
* @param maxSchemaKey The SchemaKey until which schemas of the same model should get returned
* @return All the schemas if all went well, [[Resolver.SchemaResolutionError]] with the first error that happened
* while looking up the schemas if something went wrong.
*/
def lookupSchemasUntil(
maxSchemaKey: SchemaKey
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
def go(
current: SchemaVer.Full,
acc: List[SelfDescribingSchema[Json]]
): F[Either[SchemaResolutionError, NonEmptyList[SelfDescribingSchema[Json]]]] = {
val currentSchemaKey = maxSchemaKey.copy(version = current)
lookupSchema(currentSchemaKey).flatMap {
case Left(e) =>
if (current.addition === 0)
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
else if (current.revision < maxSchemaKey.version.revision && isNotFound(e))
go(current.copy(revision = current.revision + 1, addition = 0), acc)
else
Monad[F].pure(Left(SchemaResolutionError(currentSchemaKey, e)))
case Right(json) =>
if (current.revision < maxSchemaKey.version.revision)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else if (current.addition < maxSchemaKey.version.addition)
go(
current.copy(addition = current.addition + 1),
SelfDescribingSchema(SchemaMap(currentSchemaKey), json) :: acc
)
else
Monad[F].pure(
Right(
NonEmptyList(SelfDescribingSchema(SchemaMap(currentSchemaKey), json), acc).reverse
)
)
}
}

go(SchemaVer.Full(maxSchemaKey.version.model, 0, 0), Nil)
}

/**
* Get list of available schemas for particular vendor and name part
* Server supposed to return them in proper order
Expand Down Expand Up @@ -389,6 +460,8 @@ object Resolver {
*/
case class SchemaItem(schema: Json, supersededBy: SupersededBy)

case class SchemaResolutionError(schemaKey: SchemaKey, error: ResolutionError)

/** The result of doing a lookup with the resolver, carrying information on whether the cache was used */
sealed trait ResolverResult[+K, +A] {
def value: A
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._

// Cats
import cats.Monad
import cats.data.{EitherNel, NonEmptyList}
import cats.data.NonEmptyList
import cats.syntax.all._

// Jackson
Expand Down Expand Up @@ -87,35 +87,37 @@ object CirceValidator extends Validator[Json] {
private lazy val V4Schema =
V4SchemaInstance.getSchema(new ObjectMapper().readTree(MetaSchemas.JsonSchemaV4Text))

def validate(data: Json, schema: Json): Either[ValidatorError, Unit] = {
val jacksonJson = circeToJackson(schema)
evaluateSchema(jacksonJson)
.flatMap { schema =>
validateOnReadySchema(schema, data).leftMap(ValidatorError.InvalidData.apply)
}
}

def checkSchema(schema: Json): List[ValidatorError.SchemaIssue] = {
val jacksonJson = circeToJackson(schema)
validateSchemaAgainstV4(jacksonJson)
def validate(data: Json, schema: Json): Either[ValidatorError, Unit] =
for {
jacksonJson <- circeToJackson(schema, Int.MaxValue).leftMap(_.toInvalidSchema)
schema <- evaluateSchema(jacksonJson)
_ <- validateOnReadySchema(schema, data, Int.MaxValue)
} yield ()

@deprecated("Use `checkSchema(schema, maxJsonDepth)`", "3.2.0")
def checkSchema(schema: Json): List[ValidatorError.SchemaIssue] =
checkSchema(schema, Int.MaxValue)

def checkSchema(schema: Json, maxJsonDepth: Int): List[ValidatorError.SchemaIssue] = {
circeToJackson(schema, maxJsonDepth) match {
case Left(e) => List(e.toSchemaIssue)
case Right(jacksonJson) => validateSchemaAgainstV4(jacksonJson)
}
}

/** Validate instance against schema and return same instance */
private def validateOnReadySchema(
schema: JsonSchema,
instance: Json
): EitherNel[ValidatorReport, Unit] = {
val messages = schema
.validate(circeToJackson(instance))
.asScala
.toList
.map(fromValidationMessage)

messages match {
case x :: xs => NonEmptyList(x, xs).asLeft
case Nil => ().asRight
}
}
instance: Json,
maxJsonDepth: Int
): Either[ValidatorError.InvalidData, Unit] =
for {
jacksonJson <- circeToJackson(instance, maxJsonDepth).leftMap(_.toInvalidData)
_ <- schema.validate(jacksonJson).asScala.toList.map(fromValidationMessage) match {
case x :: xs => ValidatorError.InvalidData(NonEmptyList(x, xs)).asLeft
case Nil => ().asRight
}
} yield ()

private def fromValidationMessage(m: ValidationMessage): ValidatorReport =
ValidatorReport(m.getMessage, m.getPath.some, m.getArguments.toList, m.getType.some)
Expand Down Expand Up @@ -154,41 +156,48 @@ object CirceValidator extends Validator[Json] {

def validate[F[_]: Monad](
schemaEvaluationCache: SchemaEvaluationCache[F]
)(data: Json, schema: SchemaLookupResult): F[Either[ValidatorError, Unit]] = {
getFromCacheOrEvaluate(schemaEvaluationCache)(schema)
)(
data: Json,
schema: SchemaLookupResult,
maxJsonDepth: Int
): F[Either[ValidatorError, Unit]] = {
getFromCacheOrEvaluate(schemaEvaluationCache)(schema, maxJsonDepth)
.map {
_.flatMap { jsonschema =>
validateOnReadySchema(jsonschema, data)
.leftMap(ValidatorError.InvalidData.apply)
validateOnReadySchema(jsonschema, data, maxJsonDepth)
}
}
}

private def getFromCacheOrEvaluate[F[_]: Monad](
evaluationCache: SchemaEvaluationCache[F]
)(result: SchemaLookupResult): F[Either[ValidatorError.InvalidSchema, JsonSchema]] = {
)(
result: SchemaLookupResult,
maxJsonDepth: Int
): F[Either[ValidatorError.InvalidSchema, JsonSchema]] = {
result match {
case ResolverResult.Cached(key, SchemaItem(schema, _), timestamp) =>
evaluationCache.get((key, timestamp)).flatMap {
case Some(alreadyEvaluatedSchema) =>
alreadyEvaluatedSchema.pure[F]
case None =>
provideNewJsonSchema(schema)
provideNewJsonSchema(schema, maxJsonDepth)
.pure[F]
.flatTap(result => evaluationCache.put((key, timestamp), result))
}
case ResolverResult.NotCached(SchemaItem(schema, _)) =>
provideNewJsonSchema(schema).pure[F]
provideNewJsonSchema(schema, maxJsonDepth).pure[F]
}
}

private def provideNewJsonSchema(
schema: Json
schema: Json,
maxJsonDepth: Int
): Either[ValidatorError.InvalidSchema, JsonSchema] = {
val schemaAsNode = circeToJackson(schema)
for {
_ <- validateSchema(schemaAsNode)
evaluated <- evaluateSchema(schemaAsNode)
schemaAsNode <- circeToJackson(schema, maxJsonDepth).leftMap(_.toInvalidSchema)
_ <- validateSchema(schemaAsNode)
evaluated <- evaluateSchema(schemaAsNode)
} yield evaluated
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2014-2024 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package io.circe.jackson.snowplow

import cats.data.NonEmptyList
import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport}

sealed trait CirceToJsonError extends Product with Serializable {
def message: String

def toInvalidData: ValidatorError.InvalidData =
ValidatorError.InvalidData(
NonEmptyList.one(
ValidatorReport(message, Some("/"), List.empty, None)
)
)

def toInvalidSchema: ValidatorError.InvalidSchema =
ValidatorError.InvalidSchema(NonEmptyList.one(toSchemaIssue))

def toSchemaIssue: ValidatorError.SchemaIssue =
ValidatorError.SchemaIssue(path = "/", message = message)
}

object CirceToJsonError {
case object MaxDepthExceeded extends CirceToJsonError {
override def message: String = "Maximum allowed JSON depth exceeded"
}
}
Loading
Loading