Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-zendesk-support: checkpoint during AuditLogs stream #2070

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 16 additions & 36 deletions source-zendesk-support/source_zendesk_support/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,54 +754,34 @@ class AuditLogs(SourceZendeskSupportCursorPaginationStream):

This endpoint does not respect the start_time param. It requires two query params with the same name to filter by date.
See https://support.zendesk.com/hc/en-us/community/posts/4859612547866-Audit-Log-API-error.
Instead, we mimic the TicketAudits stream strategy of not paginating further when the most recent page contains results we don't need.
"""

response_list_name: str = "audit_logs"
# audit_logs doesn't have the 'updated_by' field
cursor_field = "created_at"
state_checkpoint_interval = 100

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = {"page[size]": self.page_size}
def request_params(self, next_page_token: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None,**kwargs) -> MutableMapping[str, Any]:
cursor_value = stream_state.get(self.cursor_field, None)
start_time = cursor_value or self._start_date
# The end_time is moved a little in the past to avoid missing records that share the same "created_at" since
# records with the current cursor value are ignored.
end_time = (datetime.now(tz=UTC) - timedelta(seconds=30)).strftime(DATETIME_FORMAT)

params = {
"page[size]": self.page_size,
# By default, results are returned in descending order.
# "sort" is required to get results returned in ascending order.
"sort": "created_at",
# "filter[created_at][]" filters the responses' results to only records created within the specified timespan.
"filter[created_at][]": [start_time, end_time]
}

if next_page_token:
params.update(next_page_token)

return params

def _is_before_last_cursor_date(self, response: requests.Response, stream_state: Mapping[str, Any]) -> bool:
"""
This method checks whether a response contains documents before the last cursor date (if it exists) or thg start date.
This allows us to determine when to stop paginating backwards.
"""
document = response.json().get(self.response_list_name, [{}])[0]
document_created_at = document.get(self.cursor_field, "")
cursor_date = (stream_state or {}).get(self.cursor_field) or self._start_date
return document_created_at < cursor_date

# Same as airbyte_cdk's HttpStream._read_pages method, but adds a condition to stop paginating
# if the response contains documents created before our last cursor date / start date.
def _read_pages(
self,
records_generator_fn: Callable[
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Optional[Mapping[str, Any]]], Iterable[StreamData]
],
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
while not pagination_complete:
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
yield from records_generator_fn(request, response, stream_state, stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token or self._is_before_last_cursor_date(response, stream_state):
pagination_complete = True

# Return an empty generator in case no records are yielded
yield from []

class Users(SourceZendeskSupportIncrementalCursorExportStream):
"""Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export-cursor-based"""
Expand Down
Loading