From ad2039a2933f2a9e0af680ac7d0159152b965460 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Fri, 18 Oct 2024 16:42:32 -0400 Subject: [PATCH] source-hubspot-native: add automatic backfills for delayed incremental streams Delayed incremental streams should be wrapped in a `try/except` block to trigger automatic backfills if we encounter a `MustBackfillBinding` error. I kept the recent incremental streams wrapped in the `try/except` that serves the same purpose since it's possible for `MustBackfillBinding` exceptions to be raised if a task is disabled long enough for the cursor to fall far enough behind the recent changes available from HubSpot. --- .../source_hubspot_native/api.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/source-hubspot-native/source_hubspot_native/api.py b/source-hubspot-native/source_hubspot_native/api.py index 44e9cbe72..c0db2e71e 100644 --- a/source-hubspot-native/source_hubspot_native/api.py +++ b/source-hubspot-native/source_hubspot_native/api.py @@ -368,20 +368,24 @@ async def process_changes( delayed_emitted = 0 if delayed_fetch_next_end - delayed_fetch_next_start > delayed_fetch_minimum_window: # Poll the delayed stream for documents if we need to. - async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end): - if ts > delayed_fetch_next_end: - # In case the FetchDelayedFn is unable to filter based on - # `delayed_fetch_next_end`. - continue - elif ts > delayed_fetch_next_start: - if cache.has_as_recent_as(object_name, key, ts): - cache_hits += 1 + try: + async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end): + if ts > delayed_fetch_next_end: + # In case the FetchDelayedFn is unable to filter based on + # `delayed_fetch_next_end`. continue - - delayed_emitted += 1 - yield obj - else: - break + elif ts > delayed_fetch_next_start: + if cache.has_as_recent_as(object_name, key, ts): + cache_hits += 1 + continue + + delayed_emitted += 1 + yield obj + else: + break + except MustBackfillBinding: + log.info("triggering automatic backfill for %s", object_name) + yield Triggers.BACKFILL last_delayed_fetch_end[object_name] = delayed_fetch_next_end evicted = cache.cleanup(object_name, delayed_fetch_next_end)