Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job_timer tracking to wait_for_export #50

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions tap_marketo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pendulum
import requests
import singer

from singer import metrics

# By default, jobs will run for 3 hours and be polled every 5 minutes.
JOB_TIMEOUT = 60 * 180
Expand Down Expand Up @@ -240,24 +240,25 @@ def stream_export(self, stream_type, export_id):
def wait_for_export(self, stream_type, export_id):
# Poll the export status until it enters a finalized state or
# exceeds the job timeout time.
timeout_time = pendulum.utcnow().add(seconds=self.job_timeout)
while pendulum.utcnow() < timeout_time:
status = self.poll_export(stream_type, export_id)
singer.log_info("export %s status is %s", export_id, status)
with metrics.job_timer('Export {} for {}'.format(export_id, stream_type)):
timeout_time = pendulum.utcnow().add(seconds=self.job_timeout)
while pendulum.utcnow() < timeout_time:
status = self.poll_export(stream_type, export_id)
singer.log_info("export %s status is %s", export_id, status)

if status == "Created":
# If the status is created, the export has been made but
# not started, so enqueue the export.
self.enqueue_export(stream_type, export_id)
if status == "Created":
# If the status is created, the export has been made but
# not started, so enqueue the export.
self.enqueue_export(stream_type, export_id)

elif status in ["Cancelled", "Failed"]:
# Cancelled and failed exports fail the current sync.
raise ExportFailed(status)
elif status in ["Cancelled", "Failed"]:
# Cancelled and failed exports fail the current sync.
raise ExportFailed(status)

elif status == "Completed":
return True
elif status == "Completed":
return True

time.sleep(self.poll_interval)
time.sleep(self.poll_interval)

raise ExportFailed("Export timed out after {} minutes".format(self.job_timeout / 60))

Expand Down