Skip to content

Commit

Permalink
Not Found results should be cached for as long as the TTL (close #248)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jun 6, 2023
1 parent a3e5ded commit 9576246
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ object Resolver {
now: Instant
): List[Registry] = {
val errorsToRetry = failuresMap.filter {
case (_, LookupHistory(errors, _, _)) if errors.contains(RegistryError.NotFound) =>
false
case (_, LookupHistory(errors, _, _)) if errors.isEmpty =>
// The NotFounds have expired because of the cache TTL
true
case (_: Registry.Embedded | _: Registry.InMemory, _) =>
false
case (_, LookupHistory(_, attempts, lastAttempt)) =>
Expand All @@ -613,13 +618,13 @@ object Resolver {
Either.cond(ConfigurationSchema.matches(datum.schema), (), failure)
}

// Minimum backoff period for retry
private val MinBackoff = 500 // ms
// Minimum and maximum backoff periods for retry after server/network errors
private val MinBackoff = 500L // ms
private val MaxBackoff = 60000L // ms

// Count how many milliseconds the Resolver needs to wait before retrying
// TODO: This should not exceed TTL
private def backoff(retryCount: Int): Long =
retryCount match {
private def backoff(retryCount: Int): Long = {
val backoff = retryCount match {
case c if c > 20 => 1200000L + (retryCount * 100L)
case c if c > 12 =>
MinBackoff + Math.pow(2, retryCount.toDouble).toLong + 5000L
Expand All @@ -628,4 +633,6 @@ object Resolver {
case _ =>
MinBackoff + Math.pow(4, retryCount.toDouble).toLong
}
Math.min(backoff, MaxBackoff)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.snowplowanalytics.lrumap.LruMap
// Iglu core
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList}

import com.snowplowanalytics.iglu.client.resolver.registries.RegistryError

import Resolver.SchemaItem

/**
Expand Down Expand Up @@ -173,16 +175,26 @@ object ResolverCache {
ttl: Option[TTL],
c: LruMap[F, K, CacheEntry[Lookup[A]]],
key: K
): F[Option[TimestampedItem[Lookup[A]]]] = {
val wrapped = for {
(storageTime: StorageTime, lookup) <- OptionT(c.get(key))
currentTime <- OptionT.liftF(Clock[F].realTime)
_ = currentTime
if isViable(ttl, currentTime, storageTime) || lookup.isLeft // isLeft
} yield TimestampedItem(lookup, storageTime)

wrapped.value
}
): F[Option[TimestampedItem[Lookup[A]]]] =
OptionT(c.get(key))
.semiflatMap[Option[TimestampedItem[Lookup[A]]]] { case (storageTime, lookup) =>
Clock[F].realTime.map { currentTime =>
if (isViable(ttl, currentTime, storageTime))
Some(TimestampedItem(lookup, storageTime))
else {
lookup match {
case Right(_) => None
case Left(failures) =>
val noNotFounds = failures.map { case (k, history) =>
k -> history.copy(errors = history.errors - RegistryError.NotFound)
}
Some(TimestampedItem(Left(noNotFounds), storageTime))
}
}
}
}
.subflatMap { case o: Option[TimestampedItem[Lookup[A]]] => o }
.value

def getItem[F[_]: Monad: Clock, A, K](
ttl: Option[TTL],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE
a Resolver should not spam the registry with similar requests $e15
a Resolver should return superseding schema if resolveSupersedingSchema is true $e16
a Resolver shouldn't return superseding schema if resolveSupersedingSchema is false $e17
a Resolver should return cached "not found" when ttl not exceeded $e18
a Resolver should update cached "not found" when ttl exceeded $e19
"""

import ResolverSpec._
Expand Down Expand Up @@ -638,4 +640,114 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE
)
}
}

def e18 = {
val schemaKey =
SchemaKey(
"com.snowplowanalytics.iglu-test",
"mock_schema",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
val schema =
Json.Null.asRight[RegistryError]
val notFound = RegistryError.NotFound.asLeft[Json]
val responses = List(notFound, schema)

val httpRep =
Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null)

implicit val cache = ResolverSpecHelpers.staticResolverCache
implicit val clock = ResolverSpecHelpers.staticClock
implicit val registryLookup: RegistryLookup[StaticLookup] =
ResolverSpecHelpers.getLookup(responses, Nil)

val result = for {
resolver <- Resolver.init[StaticLookup](10, Some(200.seconds), httpRep)
response1 <- resolver.lookupSchemaResult(schemaKey)
_ <- StaticLookup.addTime(150.seconds) // ttl 200, delay 150
response2 <- resolver.lookupSchemaResult(schemaKey)
_ <- StaticLookup.addTime(100.seconds) // ttl 200, total delay 250
response3 <- resolver.lookupSchemaResult(schemaKey)
} yield (response1, response2, response3)

val (_, (response1, response2, response3)) =
result.run(ResolverSpecHelpers.RegistryState.init).value

val firstNotFound = response1 must beLeft[ResolutionError].like {
case ResolutionError(history) =>
history must haveValue(
LookupHistory(Set(RegistryError.NotFound), 1, Instant.ofEpochMilli(3L))
)
}

val firstAndSecondEqual = response1 must beEqualTo(response2)

val thirdSucceeded = response3 must beRight[SchemaLookupResult].like {
case ResolverResult.Cached(key, SchemaItem(value, _), _) =>
key must beEqualTo(schemaKey) and (value must beEqualTo(Json.Null))
}

firstNotFound and firstAndSecondEqual and thirdSucceeded

}

def e19 = {
val schemaKey =
SchemaKey(
"com.snowplowanalytics.iglu-test",
"mock_schema",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
val schema =
Json.Null.asRight[RegistryError]
val notFound = RegistryError.NotFound.asLeft[Json]
val responses = List(notFound, notFound, schema)

val repoName = "Mock Repo"
val httpRep =
Registry.Http(Registry.Config(repoName, 1, List("com.snowplowanalytics.iglu-test")), null)

implicit val cache = ResolverSpecHelpers.staticResolverCache
implicit val clock = ResolverSpecHelpers.staticClock
implicit val registryLookup: RegistryLookup[StaticLookup] =
ResolverSpecHelpers.getLookup(responses, Nil)

val result = for {
resolver <- Resolver.init[StaticLookup](10, Some(200.seconds), httpRep)
response1 <- resolver.lookupSchemaResult(schemaKey)
_ <- StaticLookup.addTime(250.seconds) // ttl 200, delay 250
response2 <- resolver.lookupSchemaResult(schemaKey)
_ <- StaticLookup.addTime(100.seconds) // ttl 200, total delay 350
response3 <- resolver.lookupSchemaResult(schemaKey)
} yield (response1, response2, response3)

val (_, (response1, response2, response3)) =
result.run(ResolverSpecHelpers.RegistryState.init).value

val firstNotFound = response1 must beLeft[ResolutionError].like {
case ResolutionError(history) =>
history must haveValue(
LookupHistory(Set(RegistryError.NotFound), 1, Instant.ofEpochMilli(3L))
)
}

val secondNotCached = response1 must beLeft[ResolutionError].like {
case ResolutionError(history1) =>
val LookupHistory(_, attempts1, lastAttempt1) = history1(repoName)
response2 must beLeft[ResolutionError].like { case ResolutionError(history2) =>
val LookupHistory(_, attempts2, lastAttempt2) = history2(repoName)
(attempts1 must beEqualTo(1)) and
(attempts2 must beEqualTo(2)) and
(lastAttempt2 mustNotEqual lastAttempt1)
}
}

val secondAndThirdEqual = response2 must beEqualTo(response3)

firstNotFound and secondNotCached and secondAndThirdEqual

}

}

0 comments on commit 9576246

Please sign in to comment.