From 79debb0a753326f3b4e7b306ec37ba3d9e27b9e1 Mon Sep 17 00:00:00 2001 From: Diego Fernandez Date: Wed, 26 Jun 2019 22:18:02 -0600 Subject: [PATCH] Add job_timer tracking to wait_for_export Fixes #49 --- tap_marketo/client.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/tap_marketo/client.py b/tap_marketo/client.py index 924440f..449a90c 100644 --- a/tap_marketo/client.py +++ b/tap_marketo/client.py @@ -4,7 +4,7 @@ import pendulum import requests import singer - +from singer import metrics # By default, jobs will run for 3 hours and be polled every 5 minutes. JOB_TIMEOUT = 60 * 180 @@ -240,24 +240,25 @@ def stream_export(self, stream_type, export_id): def wait_for_export(self, stream_type, export_id): # Poll the export status until it enters a finalized state or # exceeds the job timeout time. - timeout_time = pendulum.utcnow().add(seconds=self.job_timeout) - while pendulum.utcnow() < timeout_time: - status = self.poll_export(stream_type, export_id) - singer.log_info("export %s status is %s", export_id, status) + with metrics.job_timer('Export {} for {}'.format(export_id, stream_type)): + timeout_time = pendulum.utcnow().add(seconds=self.job_timeout) + while pendulum.utcnow() < timeout_time: + status = self.poll_export(stream_type, export_id) + singer.log_info("export %s status is %s", export_id, status) - if status == "Created": - # If the status is created, the export has been made but - # not started, so enqueue the export. - self.enqueue_export(stream_type, export_id) + if status == "Created": + # If the status is created, the export has been made but + # not started, so enqueue the export. + self.enqueue_export(stream_type, export_id) - elif status in ["Cancelled", "Failed"]: - # Cancelled and failed exports fail the current sync. - raise ExportFailed(status) + elif status in ["Cancelled", "Failed"]: + # Cancelled and failed exports fail the current sync. + raise ExportFailed(status) - elif status == "Completed": - return True + elif status == "Completed": + return True - time.sleep(self.poll_interval) + time.sleep(self.poll_interval) raise ExportFailed("Export timed out after {} minutes".format(self.job_timeout / 60))