diff --git a/source-hubspot-native/source_hubspot_native/api.py b/source-hubspot-native/source_hubspot_native/api.py index 44e9cbe72..c0db2e71e 100644 --- a/source-hubspot-native/source_hubspot_native/api.py +++ b/source-hubspot-native/source_hubspot_native/api.py @@ -368,20 +368,24 @@ async def process_changes( delayed_emitted = 0 if delayed_fetch_next_end - delayed_fetch_next_start > delayed_fetch_minimum_window: # Poll the delayed stream for documents if we need to. - async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end): - if ts > delayed_fetch_next_end: - # In case the FetchDelayedFn is unable to filter based on - # `delayed_fetch_next_end`. - continue - elif ts > delayed_fetch_next_start: - if cache.has_as_recent_as(object_name, key, ts): - cache_hits += 1 + try: + async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end): + if ts > delayed_fetch_next_end: + # In case the FetchDelayedFn is unable to filter based on + # `delayed_fetch_next_end`. continue - - delayed_emitted += 1 - yield obj - else: - break + elif ts > delayed_fetch_next_start: + if cache.has_as_recent_as(object_name, key, ts): + cache_hits += 1 + continue + + delayed_emitted += 1 + yield obj + else: + break + except MustBackfillBinding: + log.info("triggering automatic backfill for %s", object_name) + yield Triggers.BACKFILL last_delayed_fetch_end[object_name] = delayed_fetch_next_end evicted = cache.cleanup(object_name, delayed_fetch_next_end)