Skip to content

Commit

Permalink
Attempt to trigger sinking events in separate context boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
peel committed Jul 15, 2024
1 parent 6e134ad commit e34b8a5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ class Routes[F[_]: Async](

implicit val dns: Dns[F] = Dns.forSync[F]

private val healthRoutes = HttpRoutes.of[F] {
private val healthRoutes = HttpRoutes.strict[F] {
case GET -> Root / "health" =>
Ok("ok")
} <+> HttpRoutes.of[F] {
case GET -> Root / "sink-health" =>
service
.sinksHealthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.typelevel.ci._
import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

import com.snowplowanalytics.snowplow.collector.core.model._
import cats.effect.kernel.Async

trait IService[F[_]] {
def preflightResponse(req: Request[F]): F[Response[F]]
Expand All @@ -54,7 +55,7 @@ object Service {
val spAnonymousNuid = "00000000-0000-0000-0000-000000000000"
}

class Service[F[_]: Sync](
class Service[F[_]: Async](
config: Config[Any],
sinks: Sinks[F],
appInfo: AppInfo
Expand Down Expand Up @@ -121,15 +122,14 @@ class Service[F[_]: Sync](
`Access-Control-Allow-Credentials`().toRaw1.some
).flatten
responseHeaders = Headers(headerList ++ bounceLocationHeaders(config.cookieBounce, shouldBounce, request))
_ <- if (!doNotTrack && !shouldBounce) sinkEvent(event, partitionKey) else Sync[F].unit
resp = buildHttpResponse(
queryParams = request.uri.query.params,
headers = responseHeaders,
redirect = redirect,
pixelExpected = pixelExpected,
shouldBounce = shouldBounce
)
} yield resp
_ <- if (!doNotTrack && !shouldBounce) Async[F].start(sinkEvent(event, partitionKey)) else Async[F].unit
} yield buildHttpResponse(
queryParams = request.uri.query.params,
headers = responseHeaders,
redirect = redirect,
pixelExpected = pixelExpected,
shouldBounce = shouldBounce
)

override def sinksHealthy: F[Boolean] = (sinks.good.isHealthy, sinks.bad.isHealthy).mapN(_ && _)

Expand Down Expand Up @@ -323,7 +323,7 @@ class Service[F[_]: Sync](
): F[Unit] =
for {
// Split events into Good and Bad
eventSplit <- Sync[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes))
eventSplit <- Async[F].delay(splitBatch.splitAndSerializePayload(event, sinks.good.maxBytes))
// Send events to respective sinks
_ <- sinks.good.storeRawEvents(eventSplit.good, partitionKey)
_ <- sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
Expand Down

0 comments on commit e34b8a5

Please sign in to comment.