Skip to content

Commit

Permalink
Modified streams intervals and added start_date
Browse files Browse the repository at this point in the history
Small streams have 30 seconds interval
longer streams have a 1 minute interval between syncs
  • Loading branch information
Luishfs committed Apr 9, 2024
1 parent 051e02e commit da3558d
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 84 deletions.
5 changes: 3 additions & 2 deletions source-hubspot-native/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ credentials:
credentials_title: OAuth Credentials
refresh_token_sops: ENC[AES256_GCM,data:kTUyVUuNvofeZxm6+DgSAphLlHXWq5WKiloQz3Og6PHMtkPM,iv:M1xSJpdr5JC8hPF4LNkpXS1B8xkX0pCBNT6FkIYdIcU=,tag:GRd1uvS9JMREAZeSyqixRg==,type:str]
token_expiry_date: "2024-02-01T17:01:21.703Z"
start_date: "2024-04-09T12:12:34.898765Z"
sops:
kms: []
gcp_kms:
Expand All @@ -13,8 +14,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-04-02T19:03:51Z"
mac: ENC[AES256_GCM,data:8zgxwCO13ILg+xRWW1CVP+vtEfazwKZOYvZHo6NHyVxw4xUNzjRQuWcw+y9tcHig46OsY4db1pXc9R7tkj5pyZf7OZ2omCHfy9/r46ylYVAlCAsU9UXUSfrXcCHviWqIGkSuHAtfVNTSFXTVUAX9I066w6oGD/iAIQt1Ac4EXSE=,iv:vZW3taXXEwpnLcmtJ7aAlmBjAoeLyd9BdueHrQHEOck=,tag:nMXHQKwMJmmR21VtTmGtBA==,type:str]
lastmodified: "2024-04-09T12:13:21Z"
mac: ENC[AES256_GCM,data:5NdZ3gQI+wPijmW6CxkZ+txTskMqn8VJB2YTA4hEMmUCgpkM71tRGh4ZLV9f0nwVSYRcVOnYiqaQrvi5Fi3sACwMvmQxmSMIvR88Hi1THFYWfTs6040WteNXXGCKOr81puUBHl+DzAKx5nb5ov1UO38Utxnl39jeHotG2ngBtdE=,iv:wX+2pRYXIXnosiAv4Hi5st1U1rJttSPp9oXhGuww0Ro=,tag:iKlD7LdQrKtcHsDFR46bqA==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.8.1
3 changes: 3 additions & 0 deletions source-hubspot-native/source_hubspot_native/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class EndpointConfig(BaseModel):
discriminator="credentials_title",
title="Authentication",
)
start_date: AwareDatetime = Field(
title="start_date",
)


# We use ResourceState directly, without extending it.
Expand Down
93 changes: 42 additions & 51 deletions source-hubspot-native/source_hubspot_native/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,32 @@ async def all_resources(
http.token_source = TokenSource(oauth_spec=OAUTH2_SPEC, credentials=config.credentials)

resources_list = [
crm_object_streamed(Company, http, fetch_recent_companies),
crm_object_streamed(Contact, http, fetch_recent_contacts),
crm_object_streamed(Deal, http, fetch_recent_deals),
crm_object_streamed(Engagement, http, fetch_recent_engagements),
crm_object_streamed_no_batch(ContactLists, http, fetch_recent_contacts_lists),
crm_object_streamed_no_batch(ContactSubscription, http, fetch_contacts_lists_subscription),
crm_object_streamed_no_batch(Campaigns, http, fetch_campaigns),
crm_object_streamed_no_batch(SubscriptionChanges, http, fetch_recent_subscription_changes),
crm_object_streamed_no_batch(EmailEvents, http, fetch_recent_email_events),
crm_object_streamed_no_batch(TicketPipelines, http, fetch_recent_ticket_pipelines),
crm_object_streamed_no_batch(DealPipelines, http, fetch_recent_deal_pipelines),
crm_object_paginated(EngagementCalls, http),
crm_object_paginated(EngagementEmails, http),
crm_object_paginated(EngagementMeetings, http),
crm_object_paginated(EngagementNotes, http),
crm_object_paginated(EngagementTasks, http),
crm_object_paginated(Goals, http),
crm_object_paginated(FeedbackSubmissions, http),
crm_object_paginated(LineItems, http),
crm_object_paginated(Products, http),
crm_object_paginated(Ticket, http),
crm_object_custom(MarketingEmails, http, fetch_recent_marketing_emails),
subscription_object(EmailSubscriptions, http, fetch_email_subscriptions),
crm_object_custom(MarketingForms, http, fetch_marketing_forms),
crm_object_custom(Owners, http, fetch_owners),
workflow_object(Workflows, http, fetch_workflows),
crm_object_streamed(Company, http, fetch_recent_companies, config.start_date),
crm_object_streamed(Contact, http, fetch_recent_contacts, config.start_date),
crm_object_streamed(Deal, http, fetch_recent_deals, config.start_date),
crm_object_streamed(Engagement, http, fetch_recent_engagements, config.start_date),
crm_object_streamed_no_batch(ContactLists, http, fetch_recent_contacts_lists, config.start_date),
crm_object_streamed_no_batch(ContactSubscription, http, fetch_contacts_lists_subscription, config.start_date),
crm_object_streamed_no_batch(Campaigns, http, fetch_campaigns, config.start_date),
crm_object_streamed_no_batch(SubscriptionChanges, http, fetch_recent_subscription_changes, config.start_date),
crm_object_streamed_no_batch(EmailEvents, http, fetch_recent_email_events, config.start_date),
crm_object_streamed_no_batch(TicketPipelines, http, fetch_recent_ticket_pipelines, config.start_date),
crm_object_streamed_no_batch(DealPipelines, http, fetch_recent_deal_pipelines, config.start_date),
crm_object_paginated(EngagementCalls, http, config.start_date),
crm_object_paginated(EngagementEmails, http, config.start_date),
crm_object_paginated(EngagementMeetings, http, config.start_date),
crm_object_paginated(EngagementNotes, http, config.start_date),
crm_object_paginated(EngagementTasks, http, config.start_date),
crm_object_paginated(Goals, http, config.start_date),
crm_object_paginated(FeedbackSubmissions, http, config.start_date),
crm_object_paginated(LineItems, http, config.start_date),
crm_object_paginated(Products, http, config.start_date),
crm_object_paginated(Ticket, http, config.start_date),
crm_object_custom(MarketingEmails, http, fetch_recent_marketing_emails, config.start_date),
subscription_object(EmailSubscriptions, http, fetch_email_subscriptions, config.start_date),
crm_object_custom(MarketingForms, http, fetch_marketing_forms, config.start_date),
crm_object_custom(Owners, http, fetch_owners, config.start_date),
workflow_object(Workflows, http, fetch_workflows, config.start_date),
properties(http),
]

Expand All @@ -118,13 +118,13 @@ async def all_resources(
BaseCRMObject.PROPERTY_SEARCH_NAME = objects["labels"]["plural"].lower()
BaseCRMObject.ASSOCIATED_ENTITIES = []

resources_list.append(custom_objects(BaseCRMObject, http))
resources_list.append(custom_objects(BaseCRMObject, http, config.start_date))

return resources_list


def crm_object_paginated(
cls: type[CRMObject], http: HTTPSession
cls: type[CRMObject], http: HTTPSession, started_at = datetime.now(tz=UTC)
) -> common.Resource:
"""Base Resource to run V3 API objects using pagination
Expand All @@ -150,8 +150,6 @@ def open(
fetch_page=functools.partial(fetch_page, cls, http),
)

started_at = datetime.now(tz=UTC)

return common.Resource(
name=cls.NAME,
key=cls.PRIMARY_KEY,
Expand All @@ -161,12 +159,12 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(seconds=30)),
schema_inference=True,
)

def crm_object_custom(
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn, started_at = datetime.now(tz=UTC)
) -> common.Resource:
"""Custom Resource to run V3 objects using pagination
This endpoint allows for different URL objects from the V3 API
Expand Down Expand Up @@ -196,7 +194,6 @@ def open(
fetch_page=functools.partial(fetch_page_custom, cls, http),
)

started_at = datetime.now(tz=UTC)

return common.Resource(
name=cls.NAME,
Expand All @@ -207,13 +204,13 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(seconds=30)),
schema_inference=True,
)


def crm_object_streamed(
cls: type[CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn
cls: type[CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn, started_at = datetime.now(tz=UTC)
) -> common.Resource:
"""Base Resource to run V1 API objects using stream
This resource uses an batch endpoint from Hubspot. It works
Expand Down Expand Up @@ -242,7 +239,6 @@ def open(
fetch_changes=functools.partial(fetch_changes, cls, fetch_recent, http),
)

started_at = datetime.now(tz=UTC)

return common.Resource(
name=cls.NAME,
Expand All @@ -253,14 +249,14 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(seconds=30)),
schema_inference=True,
)



def crm_object_streamed_no_batch(
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn, started_at = datetime.now(tz=UTC)
) -> common.Resource:
"""Custom Resource to run V1 API objects using stream
This resource does not use the batch function from "crm_object_streamed"
Expand Down Expand Up @@ -289,8 +285,6 @@ def open(
fetch_changes=functools.partial(fetch_changes_no_batch, cls, fetch_recent, http),
)

started_at = datetime.now(tz=UTC)

return common.Resource(
name=cls.NAME,
key=cls.PRIMARY_KEY,
Expand All @@ -300,12 +294,12 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=1)),
schema_inference=True,
)

def workflow_object(
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn, started_at = datetime.now(tz=UTC)
) -> common.Resource:

"""Custom Resource to run specifically workflow stream objects
Expand All @@ -326,7 +320,6 @@ def open(
fetch_page=functools.partial(fetch_page_workflow, cls, http),
)

started_at = datetime.now(tz=UTC)

return common.Resource(
name=cls.NAME,
Expand All @@ -337,12 +330,12 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(seconds=30)),
schema_inference=True,
)

def subscription_object(
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn
cls: type[V1CRMObject], http: HTTPSession, fetch_recent: FetchRecentFn, started_at = datetime.now(tz=UTC)
) -> common.Resource:

"""
Expand All @@ -365,7 +358,6 @@ def open(
fetch_page=functools.partial(fetch_page_subscriptions, cls, http),
)

started_at = datetime.now(tz=UTC)

return common.Resource(
name=cls.NAME,
Expand All @@ -376,7 +368,7 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(seconds=30)),
schema_inference=True,
)

Expand Down Expand Up @@ -418,13 +410,13 @@ def open(
open=open,
initial_state=ResourceState(),
initial_config=ResourceConfig(
name=Names.properties, interval=timedelta(minutes=7)
name=Names.properties, interval=timedelta(seconds=30)
),
schema_inference=True,
)

def custom_objects(
cls: type[CRMObject], http: HTTPSession
cls: type[CRMObject], http: HTTPSession, started_at = datetime.now(tz=UTC)
) -> common.Resource:

"""
Expand All @@ -449,7 +441,6 @@ def open(
)


started_at = datetime.now(tz=UTC)
return common.Resource(
name=cls.NAME,
key=cls.PRIMARY_KEY,
Expand All @@ -459,6 +450,6 @@ def open(
inc=ResourceState.Incremental(cursor=started_at),
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=7)),
initial_config=ResourceConfig(name=cls.NAME, interval=timedelta(minutes=1)),
schema_inference=False,
)
Loading

0 comments on commit da3558d

Please sign in to comment.