From fd7c0c9ff591ce11c6cde7ca84e5632f272e07f2 Mon Sep 17 00:00:00 2001 From: Jean Legrais Date: Wed, 5 Oct 2022 13:39:38 -0400 Subject: [PATCH] first version --- docker/Dockerfile | 3 +- .../base_class/facebook_config.py | 90 ------ .../base_class/facebook_table_config.py | 27 -- .../base_class/source_config.py | 2 +- .../base_class/source_table_config.py | 4 +- gcp_airflow_foundations/enums/facebook.py | 31 -- .../operators/facebook/__init__.py | 0 .../operators/facebook/hooks/__init__.py | 0 .../operators/facebook/hooks/ads.py | 301 ------------------ .../operators/facebook/operators/__init__.py | 0 .../facebook/operators/facebook_ads_to_gcs.py | 241 -------------- .../source_class/__init__.py | 3 +- gcp_airflow_foundations/version.py | 2 +- helpers/scripts/fernet.py | 0 requirements.txt | 1 - setup.py | 4 +- 16 files changed, 11 insertions(+), 698 deletions(-) delete mode 100644 gcp_airflow_foundations/base_class/facebook_config.py delete mode 100644 gcp_airflow_foundations/base_class/facebook_table_config.py delete mode 100644 gcp_airflow_foundations/enums/facebook.py delete mode 100644 gcp_airflow_foundations/operators/facebook/__init__.py delete mode 100644 gcp_airflow_foundations/operators/facebook/hooks/__init__.py delete mode 100644 gcp_airflow_foundations/operators/facebook/hooks/ads.py delete mode 100644 gcp_airflow_foundations/operators/facebook/operators/__init__.py delete mode 100644 gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py mode change 100644 => 100755 helpers/scripts/fernet.py diff --git a/docker/Dockerfile b/docker/Dockerfile index e48f3d1f..19f86ceb 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,7 +15,7 @@ ENV PATH $PATH:/home/airflow/google-cloud-sdk/bin RUN mkdir /opt/airflow/gcp_airflow_foundations # Copying only files essential for installing gcp_airflow_foundations -COPY setup.py setup.cfg README.md MANIFEST.in requirements.txt requirements-providers.txt requirements-ci.txt /opt/airflow/ +COPY setup.py setup.cfg README.md MANIFEST.in requirements.txt requirements-providers.txt requirements-facebook.txt docker/gcp_airflow_foundations_facebook-0.0.1-py2.py3-none-any.whl requirements-ci.txt /opt/airflow/ #COPY tests /opt/airflow/tests COPY gcp_airflow_foundations/__init__.py /opt/airflow/gcp_airflow_foundations/__init__.py COPY gcp_airflow_foundations/version.py /opt/airflow/gcp_airflow_foundations/version.py @@ -27,6 +27,7 @@ RUN pip install --upgrade pip # - Install dependencies RUN pip install -e .[providers,test] +RUN pip install gcp_airflow_foundations_facebook-0.0.1-py2.py3-none-any.whl # USER airflow diff --git a/gcp_airflow_foundations/base_class/facebook_config.py b/gcp_airflow_foundations/base_class/facebook_config.py deleted file mode 100644 index 1423e3ab..00000000 --- a/gcp_airflow_foundations/base_class/facebook_config.py +++ /dev/null @@ -1,90 +0,0 @@ -from typing import List, Optional -from datetime import datetime - -from pydantic import validator, root_validator -from pydantic.dataclasses import dataclass - -from facebook_business.adobjects.adsinsights import AdsInsights - -from gcp_airflow_foundations.enums.facebook import ( - Level, - DatePreset, - AccountLookupScope, - ApiObject, -) - -valid_fields = { - "account_name": AdsInsights.Field.account_name, - "account_id": AdsInsights.Field.account_id, - "attribution_setting": AdsInsights.Field.attribution_setting, - "account_currency": AdsInsights.Field.account_currency, - "campaign_name": AdsInsights.Field.campaign_name, - "campaign_id": AdsInsights.Field.campaign_id, - "adset_name": AdsInsights.Field.adset_name, - "adset_id": AdsInsights.Field.adset_id, - "ad_name": AdsInsights.Field.ad_name, - "ad_id": AdsInsights.Field.ad_id, - "impressions": AdsInsights.Field.impressions, - "spend": AdsInsights.Field.spend, - "reach": AdsInsights.Field.reach, - "clicks": AdsInsights.Field.clicks, - "cpc": AdsInsights.Field.cpc, - "ctr": AdsInsights.Field.ctr, - "cpm": AdsInsights.Field.cpm, - "unique_clicks": AdsInsights.Field.unique_clicks, - "inline_link_clicks": AdsInsights.Field.inline_link_clicks, - "unique_inline_link_click_ctr": AdsInsights.Field.unique_inline_link_click_ctr, - "inline_link_click_ctr": AdsInsights.Field.inline_link_click_ctr, - "unique_inline_link_clicks": AdsInsights.Field.unique_inline_link_clicks, - "cost_per_unique_inline_link_click": AdsInsights.Field.cost_per_unique_inline_link_click, - "cost_per_unique_outbound_click": AdsInsights.Field.cost_per_unique_outbound_click, - "cost_per_unique_click": AdsInsights.Field.cost_per_unique_click, - "cost_per_thruplay": AdsInsights.Field.cost_per_thruplay, - "video_30_sec_watched_actions": AdsInsights.Field.video_30_sec_watched_actions, - "video_p25_watched_actions": AdsInsights.Field.video_p25_watched_actions, - "video_p50_watched_actions": AdsInsights.Field.video_p50_watched_actions, - "video_p75_watched_actions": AdsInsights.Field.video_p75_watched_actions, - "video_p100_watched_actions": AdsInsights.Field.video_p100_watched_actions, - "video_play_actions": AdsInsights.Field.video_play_actions, - "conversion_values": AdsInsights.Field.conversion_values, - "conversions": AdsInsights.Field.conversions, - "cost_per_conversion": AdsInsights.Field.cost_per_conversion, - "actions": AdsInsights.Field.actions, - "action_values": AdsInsights.Field.action_values, - "cost_per_action_type": AdsInsights.Field.cost_per_action_type, -} - - -@dataclass -class FacebookConfig: - """ - Attributes: - fields: A list of dimensions and metrics for the Facebook Graph API. For more information see: https://developers.facebook.com/docs/marketing-api/insights/parameters/v12.0 - level: Represents the level of result {ad, adset, campaign, account} - account_lookup_scope: Whether to query all accounts managed by the user or only the active ones - account_bq_table: A BigQuery table with the account_id's - time_increment: The time dimension of the results - time_range: The time range used to query the Graph API - use_account_attribution_setting: When this parameter is set to true, your ads results will be shown using the attribution settings defined for the ad account. - use_unified_attribution_setting: When this parameter is set to true, your ads results will be shown using unified attribution settings defined at ad set level and parameter - """ - - fields: Optional[List[str]] - level: Optional[Level] - account_lookup_scope: AccountLookupScope - accounts_bq_table: Optional[str] - time_increment: Optional[str] - time_range: Optional[dict] - use_account_attribution_setting: Optional[bool] = False - use_unified_attribution_setting: Optional[bool] = False - - @validator("fields") - def valid_fields(cls, v): - if v is not None: - for field in v: - assert ( - field in valid_fields - ), f"`{field}` is not a valid field for the Facebook API" - return [valid_fields[field] for field in v] - else: - return [] diff --git a/gcp_airflow_foundations/base_class/facebook_table_config.py b/gcp_airflow_foundations/base_class/facebook_table_config.py deleted file mode 100644 index c29324c8..00000000 --- a/gcp_airflow_foundations/base_class/facebook_table_config.py +++ /dev/null @@ -1,27 +0,0 @@ -from dacite import Config -from dataclasses import dataclass, field -from pydantic import validator -from typing import List, Optional - -from gcp_airflow_foundations.enums.facebook import ApiObject - - -@dataclass -class FacebookTableConfig: - """ - Attributes: - api_object: The API object to query {insights, campaign, adset} - breakdowns: How to break down the result. For more than one breakdown, only certain combinations are available. - action_breakdowns: How to break down action results. Supports more than one breakdowns. Default value is ["action_type"]. - """ - - api_object: Optional[ApiObject] - breakdowns: Optional[List[str]] - action_breakdowns: Optional[List[str]] - - @validator("api_object") - def valid_fields(cls, v): - if v is None: - return ApiObject.INSIGHTS - else: - return v diff --git a/gcp_airflow_foundations/base_class/source_config.py b/gcp_airflow_foundations/base_class/source_config.py index 23f27034..9969ddc5 100644 --- a/gcp_airflow_foundations/base_class/source_config.py +++ b/gcp_airflow_foundations/base_class/source_config.py @@ -7,7 +7,7 @@ from gcp_airflow_foundations.enums.source_type import SourceType from gcp_airflow_foundations.base_class.landing_zone_config import LandingZoneConfig from gcp_airflow_foundations.base_class.schema_options_config import SchemaOptionsConfig -from gcp_airflow_foundations.base_class.facebook_config import FacebookConfig +from gcp_airflow_foundations_facebook.base_class.facebook_config import FacebookConfig from gcp_airflow_foundations.base_class.dlp_source_config import DlpSourceConfig from gcp_airflow_foundations.base_class.source_ingestion_config import FullIngestionConfig diff --git a/gcp_airflow_foundations/base_class/source_table_config.py b/gcp_airflow_foundations/base_class/source_table_config.py index cf0bf2a5..9d2e3f25 100644 --- a/gcp_airflow_foundations/base_class/source_table_config.py +++ b/gcp_airflow_foundations/base_class/source_table_config.py @@ -19,8 +19,8 @@ from gcp_airflow_foundations.base_class.source_base_config import SourceBaseConfig from gcp_airflow_foundations.base_class.ods_table_config import OdsTableConfig from gcp_airflow_foundations.base_class.hds_table_config import HdsTableConfig -from gcp_airflow_foundations.base_class.facebook_table_config import FacebookTableConfig -from gcp_airflow_foundations.enums.facebook import ApiObject +from gcp_airflow_foundations_facebook.base_class.facebook_table_config import FacebookTableConfig +from gcp_airflow_foundations_facebook.enums.facebook import ApiObject from gcp_airflow_foundations.base_class.column_udf_config import ColumnUDFConfig diff --git a/gcp_airflow_foundations/enums/facebook.py b/gcp_airflow_foundations/enums/facebook.py deleted file mode 100644 index 22160f2f..00000000 --- a/gcp_airflow_foundations/enums/facebook.py +++ /dev/null @@ -1,31 +0,0 @@ -from enum import Enum, unique - - -@unique -class Level(Enum): - AD = "ad" - ADSET = "adset" - CAMPAIGN = "campaign" - ACCOUNT = "account" - - -@unique -class DatePreset(Enum): - TODAY = "today" - YESTERDAY = "yesterday" - THIS_MONTH = "this_month" - LAST_MONTH = "last_month" - MAXIMUM = "maximum" - - -@unique -class AccountLookupScope(Enum): - ALL = "all" - ACTIVE = "active" - - -@unique -class ApiObject(Enum): - INSIGHTS = "INSIGHTS" - CAMPAIGNS = "CAMPAIGNS" - ADSETS = "ADSETS" diff --git a/gcp_airflow_foundations/operators/facebook/__init__.py b/gcp_airflow_foundations/operators/facebook/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/gcp_airflow_foundations/operators/facebook/hooks/__init__.py b/gcp_airflow_foundations/operators/facebook/hooks/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/gcp_airflow_foundations/operators/facebook/hooks/ads.py b/gcp_airflow_foundations/operators/facebook/hooks/ads.py deleted file mode 100644 index 030a3348..00000000 --- a/gcp_airflow_foundations/operators/facebook/hooks/ads.py +++ /dev/null @@ -1,301 +0,0 @@ -import time -from typing import Any, Dict, List, Optional -from enum import Enum -from datetime import datetime -import requests -import json - -from airflow.exceptions import AirflowException -from airflow.providers.facebook.ads.hooks.ads import FacebookAdsReportingHook - -from facebook_business.api import FacebookAdsApi -from facebook_business.adobjects.adreportrun import AdReportRun -from facebook_business.adobjects.adaccount import AdAccount -from facebook_business.adobjects.adsinsights import AdsInsights - -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook -from google.cloud import bigquery - - -class JobStatus(Enum): - """Available options for facebook async task status""" - - COMPLETED = "Job Completed" - STARTED = "Job Started" - RUNNING = "Job Running" - FAILED = "Job Failed" - SKIPPED = "Job Skipped" - - -class CustomFacebookAdsReportingHook(FacebookAdsReportingHook): - """ - Custom Hook for the Facebook Ads API. It extends the default FacebookAdsReportingHook. - - :param facebook_conn_id: Airflow Facebook Ads connection ID - :type facebook_conn_id: str - :param api_version: The version of Facebook API. Default to None. If it is None, - it will use the Facebook business SDK default version. - :type api_version: Optional[str] - """ - - conn_name_attr = "facebook_conn_id" - default_conn_name = "facebook_custom" - - def __init__( - self, - facebook_conn_id: str = default_conn_name, - api_version: Optional[str] = None, - **kwargs, - ) -> None: - super(CustomFacebookAdsReportingHook, self).__init__( - facebook_conn_id=facebook_conn_id, api_version=api_version, **kwargs - ) - - self.facebook_conn_id = facebook_conn_id - self.api_version = api_version - self.client_required_fields = ["app_id", "app_secret", "access_token"] - self.config = self.facebook_ads_config - - def _get_service(self, facebook_acc_id) -> FacebookAdsApi: - """Returns Facebook Ads Client using a service account""" - - return FacebookAdsApi.init( - app_id=self.config["app_id"], - app_secret=self.config["app_secret"], - access_token=self.config["access_token"], - account_id=facebook_acc_id, - api_version=self.api_version, - ) - - def bulk_facebook_report_async( - self, - facebook_acc_id: str, - params: Dict[str, Any], - fields: List[str], - sleep_time: int = 5, - ) -> List[AdsInsights]: - """ - Pulls data from the Facebook Ads API using async calls. - :param facebook_acc_id: The Facebook account ID to pull data from. - :type facebook_acc_id: str - :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type fields: List[str] - :param params: Parameters that determine the query for Facebook - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type fields: Dict[str, Any] - :param sleep_time: Time to sleep when async call is happening - :type sleep_time: int - :return: Facebook Ads API response, converted to rows. - :rtype: List[dict] - """ - - api = self._get_service(facebook_acc_id=facebook_acc_id) - ad_account = AdAccount(api.get_default_account_id(), api=api) - _async = ad_account.get_insights(params=params, fields=fields, is_async=True) - while True: - request = _async.api_get() - async_status = request[AdReportRun.Field.async_status] - percent = request[AdReportRun.Field.async_percent_completion] - self.log.info( - "%s %s completed, async_status: %s", percent, "%", async_status - ) - if async_status == JobStatus.COMPLETED.value: - self.log.info("Job run completed") - break - if async_status in [JobStatus.SKIPPED.value, JobStatus.FAILED.value]: - message = f"{async_status}. Please retry." - raise AirflowException(message) - time.sleep(sleep_time) - report_run_id = _async.api_get()["report_run_id"] - report_object = AdReportRun(report_run_id, api=api) - insights = report_object.get_insights() - - max_current_usage = self.usage_throttle(insights) - - if max_current_usage >= 75: - return -1 - - self.log.info("Extracting data from returned Facebook Ads Iterators") - - rows = [] - while True: - max_current_usage = self.usage_throttle(insights) - if max_current_usage >= 75: - self.log.info("75% Rate Limit Reached. Cooling Time 5 Minutes.") - time.sleep(300) - try: - rows.append(next(insights)) - except StopIteration: - break - - return [dict(row) for row in rows] - - def bulk_facebook_report( - self, - facebook_acc_id: str, - params: Dict[str, Any], - fields: List[str], - sleep_time: int = 5, - ) -> List[AdsInsights]: - """ - Pulls data from the Facebook Ads API using sync calls. - :param facebook_acc_id: The Facebook account ID to pull data from. - :type facebook_acc_id: str - :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type fields: List[str] - :param params: Parameters that determine the query for Facebook - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type fields: Dict[str, Any] - :return: Facebook Ads API response, converted to rows. - :rtype: List[dict] - """ - - api = self._get_service(facebook_acc_id=facebook_acc_id) - ad_account = AdAccount(api.get_default_account_id(), api=api) - insights = ad_account.get_insights(params=params, fields=fields, is_async=False) - rows = list(insights) - - self.usage_throttle(insights) - - return [dict(row) for row in rows] - - def get_campaigns(self, facebook_acc_id: str, params: Dict[str, Any]) -> List[dict]: - """ - Pulls campaign data from the Facebook Ads API using sync calls. - :param facebook_acc_id: The Facebook account ID to pull data from. - :type facebook_acc_id: str - :param params: Parameters that determine the query for Facebook - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type params: Dict[str, Any] - :return: Facebook Ads API response, converted to rows. - :rtype: List[dict] - """ - - api = self._get_service(facebook_acc_id=facebook_acc_id) - ad_account = AdAccount(api.get_default_account_id(), api=api) - - campaigns = ad_account.get_campaigns( - params={"limit": "20000", "time_range": params["time_range"]}, - fields=[ - "account_id", - # 'name', TO-DO: troubleshoot why pyarrow fails to convert the `name` column - "daily_budget", - "effective_status", - "lifetime_budget", - "start_time", - "stop_time", - ], - ) - - rows = [] - for row in campaigns: - converted_row = row._data - if "name" in converted_row: - converted_row["name"] = str(converted_row["name"]) - if "start_time" in converted_row: - converted_row["start_time"] = datetime.strptime( - converted_row["start_time"], "%Y-%m-%dT%H:%M:%S%z" - ) - if "stop_time" in row: - converted_row["stop_time"] = datetime.strptime( - converted_row["stop_time"], "%Y-%m-%dT%H:%M:%S%z" - ) - rows.append(converted_row) - - return rows - - def get_adsets(self, facebook_acc_id: str, params: Dict[str, Any]) -> List[dict]: - """ - Pulls adset data from the Facebook Ads API using sync calls. - :param facebook_acc_id: The Facebook account ID to pull data from. - :type facebook_acc_id: str - :param params: Parameters that determine the query for Facebook - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type fields: Dict[str, Any] - :return: Facebook Ads API response, converted to rows. - :rtype: List[dict] - """ - - api = self._get_service(facebook_acc_id=facebook_acc_id) - ad_account = AdAccount(api.get_default_account_id(), api=api) - - adsets = ad_account.get_ad_sets( - params={"limit": "20000", "time_range": params["time_range"]}, - fields=[ - "account_id", - "name", - "campaign_id", - "daily_budget", - "effective_status", - "lifetime_budget", - "created_time", - "end_time", - ], - ) - - rows = [] - for row in adsets: - converted_row = row._data - if "name" in converted_row: - converted_row["name"] = str(converted_row["name"]) - if "created_time" in converted_row: - converted_row["created_time"] = datetime.strptime( - converted_row["created_time"], "%Y-%m-%dT%H:%M:%S%z" - ) - if "end_time" in row: - converted_row["end_time"] = datetime.strptime( - converted_row["end_time"], "%Y-%m-%dT%H:%M:%S%z" - ) - rows.append(converted_row) - - return rows - - def get_active_accounts_from_bq(self, project_id, table_id) -> List[str]: - """ - Pulls a list of Facebook account IDs from a BigQuery table. - :param project_id: The Google Cloud Platform project ID. - :type project_id: str - :param table_id: Name of BigQuery table that contains the Facebook account IDS. - :type table_id: str - :return: A list with the Facebook account IDs. - :rtype: List[str] - """ - - sql = f"SELECT account_id FROM `{table_id}`" - - query_config = bigquery.QueryJobConfig(use_legacy_sql=False) - - client = bigquery.Client(project=project_id) - - df = client.query(sql, job_config=query_config).to_dataframe() - - return [f"act_{i}" for i in df.account_id] - - def get_all_accounts(self) -> List[str]: - """ - Pulls a list of Facebook account IDs from the Facebook API. - :return: A list with the Facebook account IDs. - :rtype: List[str] - """ - - self.log.info("Extracting all accounts") - - user_id = self.config["user_id"] - - URL = f"https://graph.facebook.com/v12.0/{user_id}/adaccounts" - params = {"access_token": self.config["access_token"], "limit": 10000} - - accounts = requests.get(URL, params=params).json()["data"] - - return [i["id"] for i in accounts] - - def usage_throttle(self, insights) -> int: - """ - Queries the 'x-business-use-case-usage' header of the Cursor object returned by the Facebook API. - """ - - usage_header = json.loads(insights._headers["x-business-use-case-usage"]) - values = list(usage_header.values())[0][0] - return max(values["call_count"], values["total_cputime"], values["total_time"]) diff --git a/gcp_airflow_foundations/operators/facebook/operators/__init__.py b/gcp_airflow_foundations/operators/facebook/operators/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py deleted file mode 100644 index b03f7391..00000000 --- a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py +++ /dev/null @@ -1,241 +0,0 @@ -import csv -import tempfile -import warnings -import time -from typing import Any, Dict, List, Optional, Sequence, Union -from random import shuffle -import pandas as pd -from datetime import datetime -from dateutil.relativedelta import relativedelta - -import pyarrow.parquet as pq -import pyarrow - -from airflow.exceptions import AirflowException - -from gcp_airflow_foundations.operators.facebook.hooks.ads import ( - CustomFacebookAdsReportingHook, -) -from gcp_airflow_foundations.enums.facebook import AccountLookupScope, ApiObject - -from airflow.models import BaseOperator, Variable -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook - -from google.cloud import bigquery - - -class FacebookAdsReportToBqOperator(BaseOperator): - """ - Fetches the results from the Facebook Ads API as desired in the params and fields - Converts to parquet format and loads to directly BigQuery maintaining the native nested - representation of the data. - - :param api_object: The API Object to query from - :type api_object: ApiObject - :param gcp_project: The Google Cloud Platform project ID - :type gcp_project: str - :param account_lookup_scope: Whether to query all or only the active accounts managed by the user. - :type account_lookup_scope: AccountLookupScope - :param destination_project_dataset_table: BigQuery staging zone table. String in dotted (.). format. - :type destination_project_dataset_table: str - :param accounts_bq_table: BigQuery table with the Facebook Account IDs to query data from. String in dotted (.).
format. - :type accounts_bq_table: str - :param time_range: Time range used in the Graph API query. - :type time_range: Dict[str, Any] - :param gcp_conn_id: Airflow Google Cloud connection ID - :type gcp_conn_id: str - :param facebook_conn_id: Airflow Facebook Ads connection ID - :type facebook_conn_id: str - :param api_version: The version of Facebook API. Default to None. If it is None, - it will use the Facebook business SDK default version. - :type api_version: str - :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type fields: List[str] - :param parameters: Parameters that determine the query for Facebook - https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 - :type parameters: Dict[str, Any] - :param impersonation_chain: Optional service account to impersonate using short-term - credentials, or chained list of accounts required to get the access_token - of the last account in the list, which will be impersonated in the request. - If set as a string, the account must grant the originating account - the Service Account Token Creator IAM role. - If set as a sequence, the identities from the list must grant - Service Account Token Creator IAM role to the directly preceding identity, with first - account from the list granting this role to the originating account (templated). - :type impersonation_chain: Union[str, Sequence[str]] - """ - - template_fields = ("facebook_conn_id", "impersonation_chain", "parameters") - - def __init__( - self, - *, - api_object: ApiObject, - gcp_project: str, - account_lookup_scope: AccountLookupScope, - destination_project_dataset_table: str, - accounts_bq_table: str, - fields: List[str], - parameters: Dict[str, Any] = None, - time_range: Dict[str, Any] = None, - api_version: Optional[str] = None, - gcp_conn_id: str = "google_cloud_default", - facebook_conn_id: str = "facebook_custom", - impersonation_chain: Optional[Union[str, Sequence[str]]] = None, - **kwargs, - ) -> None: - super(FacebookAdsReportToBqOperator, self).__init__(**kwargs) - - self.api_object = api_object - self.gcp_project = gcp_project - self.account_lookup_scope = account_lookup_scope - self.destination_project_dataset_table = destination_project_dataset_table - self.accounts_bq_table = accounts_bq_table - self.gcp_conn_id = gcp_conn_id - self.facebook_conn_id = facebook_conn_id - self.api_version = api_version - self.fields = fields - self.parameters = parameters - self.time_range = time_range - self.impersonation_chain = impersonation_chain - - def execute(self, context: dict): - - ds = context["ds"] - - if not self.time_range: - self.parameters["time_range"] = {"since": ds, "until": ds} - else: - self.parameters["time_range"] = { - "since": self.time_range["since"], - "until": ds, - } - - self.log.info( - "Currently loading data for date range: %s", self.parameters["time_range"] - ) - - service = CustomFacebookAdsReportingHook( - facebook_conn_id=self.facebook_conn_id, api_version=self.api_version - ) - - if self.account_lookup_scope == AccountLookupScope.ALL: - facebook_acc_ids = service.get_all_accounts() - - elif self.account_lookup_scope == AccountLookupScope.ACTIVE: - facebook_acc_ids = service.get_active_accounts_from_bq( - project_id=self.gcp_project, table_id=self.accounts_bq_table - ) - - shuffle(facebook_acc_ids) - - converted_rows = [] - while True: - for facebook_acc_id in facebook_acc_ids: - - self.log.info( - "Currently loading data from Account ID: %s", facebook_acc_id - ) - - try: - if self.api_object == ApiObject.INSIGHTS: - rows = service.bulk_facebook_report_async( - facebook_acc_id=facebook_acc_id, - params=self.parameters, - fields=self.fields, - ) - if rows == -1: - self.log.info( - "Rate Limit has reached 75%. Moving on to the next account. Will retry later" - ) - continue - - elif self.api_object == ApiObject.CAMPAIGNS: - rows = service.get_campaigns( - facebook_acc_id=facebook_acc_id, params=self.parameters - ) - - elif self.api_object == ApiObject.ADSETS: - rows = service.get_adsets( - facebook_acc_id=facebook_acc_id, params=self.parameters - ) - - converted_rows.extend(rows) - - facebook_acc_ids.remove(facebook_acc_id) - - self.log.info( - "Extracting data for account %s completed", facebook_acc_id - ) - except: # noqa: E722 - self.log.info( - "Extracting data for account %s failed. Will retry later.", - facebook_acc_id, - ) - - if len(facebook_acc_ids) == 0: - break - - self.log.info("Facebook Returned %s data points", len(converted_rows)) - - self.transform_data_types(converted_rows) - - df = pd.DataFrame.from_dict(converted_rows) - - writer = pyarrow.BufferOutputStream() - pq.write_table( - pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True - ) - reader = pyarrow.BufferReader(writer.getvalue()) - - hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id) - - client = hook.get_client(project_id=self.gcp_project) - - parquet_options = bigquery.format_options.ParquetOptions() - parquet_options.enable_list_inference = True - - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.parquet_options = parquet_options - job_config.write_disposition = "WRITE_TRUNCATE" - - client.load_table_from_file( - reader, - f"{self.destination_project_dataset_table}_{ds}", - job_config=job_config, - ) - - def transform_data_types(self, rows): - """ - Transforms the fields returned by the Facebook API to float or date data types as appropriate. - - :param rows: List of dictionary rows returned by the Facebook API. - :type rows: List[dict] - """ - for i in rows: - i.pop("date_stop") - i["date_start"] = datetime.strptime(i["date_start"], "%Y-%m-%d").date() - for j in i: - if j.endswith("id") or j.endswith("name"): - continue - elif type(i[j]) == str: - i[j] = self.get_float(i[j]) - elif type(i[j]) == list: - for k in i[j]: - for w in k: - if (type(k[w]) == str) and (not w.endswith("id")): - k[w] = self.get_float(k[w]) - - def get_float(self, element): - """ - Attempts to cast a string object into float. - - :param element: Value to be converted to float. - :type element: str - """ - try: - return float(element) - except ValueError: - return element diff --git a/gcp_airflow_foundations/source_class/__init__.py b/gcp_airflow_foundations/source_class/__init__.py index 292a8ca7..653181d0 100644 --- a/gcp_airflow_foundations/source_class/__init__.py +++ b/gcp_airflow_foundations/source_class/__init__.py @@ -7,7 +7,7 @@ from gcp_airflow_foundations.source_class.oracle_dataflow_source import ( OracleToBQDataflowDagBuilder, ) -from gcp_airflow_foundations.source_class.facebook import FacebooktoBQDagBuilder + from gcp_airflow_foundations.base_class.data_source_table_config import ( DataSourceTablesConfig, @@ -30,6 +30,7 @@ def get_dag_builder( logging.info("Selecting Oracle builder") return OracleToBQDataflowDagBuilder(default_task_args, config) elif source == SourceType.FACEBOOK: + from gcp_airflow_foundations_facebook.source_class.facebook import FacebooktoBQDagBuilder logging.info("Selecting Facebook builder") return FacebooktoBQDagBuilder(default_task_args, config) else: diff --git a/gcp_airflow_foundations/version.py b/gcp_airflow_foundations/version.py index a8d4557d..d7b30e12 100644 --- a/gcp_airflow_foundations/version.py +++ b/gcp_airflow_foundations/version.py @@ -1 +1 @@ -__version__ = "0.3.5" +__version__ = "0.3.6" diff --git a/helpers/scripts/fernet.py b/helpers/scripts/fernet.py old mode 100644 new mode 100755 diff --git a/requirements.txt b/requirements.txt index 3b681c37..e00cadac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ pydantic==1.8.2 -facebook_business>=10.0.0 dacite>=1.5.0 regex>=2021.11.1 twilio diff --git a/setup.py b/setup.py index 5e1351ef..56180e25 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,8 @@ requirements_providers = f.read().strip().split("\n") with open(os.path.join(here, "requirements-ci.txt"), "r") as f: requirements_test = f.read().strip().split("\n") +with open(os.path.join(here, "requirements-facebook.txt"), "r") as f: + requirements_facebook = f.read().strip().split("\n") this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() @@ -44,7 +46,7 @@ def main(): license="Apache 2.0", packages=packages, install_requires=requirements, - extras_require={'providers': requirements_providers, 'test': requirements_test}, + extras_require={"providers": requirements_providers, "test": requirements_test, "facebook":requirements_facebook}, classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers",