Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added comments in the code. #76

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tap_zendesk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ def request_metrics_patch(self, method, url, **kwargs):
Session.request = request_metrics_patch
# end patch

# Discover schemas for all streams and dump catalog
def do_discover(client):
LOGGER.info("Starting discover")
catalog = {"streams": discover_streams(client)}
json.dump(catalog, sys.stdout, indent=2)
LOGGER.info("Finished discover")

# Check stream selected or not from metadata
def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)

# Return list of all the selected streams in catalog
def get_selected_streams(catalog):
selected_stream_names = []
for stream in catalog.streams:
Expand All @@ -68,6 +71,7 @@ def get_selected_streams(catalog):
'tickets': ['ticket_audits', 'ticket_metrics', 'ticket_comments']
}

# Return list of all the sub streams of the streams
def get_sub_stream_names():
sub_stream_names = []
for parent_stream in SUB_STREAMS:
Expand All @@ -77,6 +81,7 @@ def get_sub_stream_names():
class DependencyException(Exception):
pass

# Validate and raise exceptions if sub-streams are selected but related parents not selected
def validate_dependencies(selected_stream_ids):
errs = []
msg_tmpl = ("Unable to extract {0} data. "
Expand All @@ -90,21 +95,25 @@ def validate_dependencies(selected_stream_ids):
if errs:
raise DependencyException(" ".join(errs))

# Populate class schemas for all the streams selected in the catalog
def populate_class_schemas(catalog, selected_stream_names):
for stream in catalog.streams:
if stream.tap_stream_id in selected_stream_names:
STREAMS[stream.tap_stream_id].stream = stream

# run sync mode
def do_sync(client, catalog, state, config):

selected_stream_names = get_selected_streams(catalog)
validate_dependencies(selected_stream_names)
populate_class_schemas(catalog, selected_stream_names)
all_sub_stream_names = get_sub_stream_names()

# Loop over streams in catalog
for stream in catalog.streams:
stream_name = stream.tap_stream_id
mdata = metadata.to_map(stream.metadata)
#If stream does not selected then skip it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#If stream does not selected then skip it.
# If the stream is not selected then skip it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

if stream_name not in selected_stream_names:
LOGGER.info("%s: Skipping - not selected", stream_name)
continue
Expand All @@ -120,11 +129,14 @@ def do_sync(client, catalog, state, config):
# LOGGER.info("%s: Starting", stream_name)


# Write schema of streams to STDOUT
key_properties = metadata.get(mdata, (), 'table-key-properties')
singer.write_schema(stream_name, stream.schema.to_dict(), key_properties)

sub_stream_names = SUB_STREAMS.get(stream_name)
# Populate class schemas and write a schema for the selected substreams of the stream
if sub_stream_names:
# Loop over sub-streams of current stream
for sub_stream_name in sub_stream_names:
if sub_stream_name not in selected_stream_names:
continue
Expand All @@ -148,6 +160,7 @@ def do_sync(client, catalog, state, config):
LOGGER.info("Finished sync")
zendesk_metrics.log_aggregate_rates()

#Return dictionary of params for authentication using oauth
def oauth_auth(args):
if not set(OAUTH_CONFIG_KEYS).issubset(args.config.keys()):
LOGGER.debug("OAuth authentication unavailable.")
Expand All @@ -159,6 +172,7 @@ def oauth_auth(args):
"oauth_token": args.config['access_token'],
}

#Return dictionary of params for authentication using api_token
def api_token_auth(args):
if not set(API_TOKEN_CONFIG_KEYS).issubset(args.config.keys()):
LOGGER.debug("API Token authentication unavailable.")
Expand All @@ -171,6 +185,7 @@ def api_token_auth(args):
"token": args.config['api_token']
}

#Return session object to pass in Zenpy class
def get_session(config):
""" Add partner information to requests Session object if specified in the config. """
if not all(k in config for k in ["marketplace_name",
Expand Down
2 changes: 2 additions & 0 deletions tap_zendesk/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

# Load and return dictionary of referenced schemas from 'schemas/shared'
def load_shared_schema_refs():
ref_sub_path = 'shared'
shared_schemas_path = get_abs_path('schemas/' + ref_sub_path)
Expand All @@ -20,6 +21,7 @@ def load_shared_schema_refs():

return shared_schema_refs

# Discover schemas, build metadata for all the steams and return catalog
def discover_streams(client):
streams = []
refs = load_shared_schema_refs()
Expand Down
7 changes: 7 additions & 0 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
LOGGER = singer.get_logger()


#Check whether error should be retried or not.
def is_fatal(exception):
status_code = exception.response.status_code

Expand All @@ -18,6 +19,7 @@ def is_fatal(exception):

return 400 <= status_code < 500

#Call Api and retry 429 error.
@backoff.on_exception(backoff.expo,
requests.exceptions.HTTPError,
max_tries=10,
Expand All @@ -27,6 +29,7 @@ def call_api(url, params, headers):
response.raise_for_status()
return response

#Retrieve data with cursor based pagination of particular stream
def get_cursor_based(url, access_token, cursor=None, **kwargs):
headers = {
'Content-Type': 'application/json',
Expand All @@ -50,6 +53,7 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs):

has_more = response_json['meta']['has_more']

#If has_more is true, then fetch next page of data.
while has_more:
cursor = response_json['meta']['after_cursor']
params['page[after]'] = cursor
Expand All @@ -60,6 +64,7 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs):
yield response_json
has_more = response_json['meta']['has_more']

#Retrieve data with offset based pagination of particular stream
def get_offset_based(url, access_token, **kwargs):
headers = {
'Content-Type': 'application/json',
Expand All @@ -80,13 +85,15 @@ def get_offset_based(url, access_token, **kwargs):

next_url = response_json.get('next_page')

#If next_url is true then fetch next page of data.
while next_url:
response = call_api(next_url, params=None, headers=headers)
response_json = response.json()

yield response_json
next_url = response_json.get('next_page')

#Retrieve data from the incremental exports endpoint using cursor based pagination
def get_incremental_export(url, access_token, start_time):
headers = {
'Content-Type': 'application/json',
Expand Down
Loading