From d257ad335a2393d7c3380ccbebbc401c7603ee00 Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Wed, 17 Jul 2024 16:27:55 +0300 Subject: [PATCH] Include organisation files --- iatidata/__init__.py | 195 +++++++++++++++++--------------- iatidata/tests/test_iatidata.py | 2 +- 2 files changed, 103 insertions(+), 94 deletions(-) diff --git a/iatidata/__init__.py b/iatidata/__init__.py index 8a9cd27..458cd99 100644 --- a/iatidata/__init__.py +++ b/iatidata/__init__.py @@ -28,7 +28,7 @@ from google.cloud.bigquery.dataset import AccessEntry from google.oauth2 import service_account from lxml import etree -from sqlalchemy import Engine, Row, column, create_engine, insert, table, text +from sqlalchemy import Engine, column, create_engine, insert, table, text from iatidata import sort_iati @@ -82,30 +82,40 @@ def create_table(table, sql, **params): _create_table(table.lower(), con, sql, **params) -def create_activities_table(): - logger.debug("Creating table: _all_activities") +def create_raw_tables(): engine = get_engine() with engine.begin() as connection: - connection.execute( - text( - """ - DROP TABLE IF EXISTS _all_activities; - CREATE TABLE _all_activities( - id SERIAL, prefix TEXT, dataset TEXT, filename TEXT, error TEXT, version TEXT, activity JSONB - ); - """ + for filetype in ["activity", "organisation"]: + table_name = f"_raw_{filetype}" + logger.debug(f"Creating table: {table_name}") + connection.execute( + text( + f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name}( + id SERIAL, prefix TEXT, dataset TEXT, filename TEXT, error TEXT, version TEXT, object JSONB + ); + """ + ) ) - ) @functools.lru_cache -def get_activities_schema() -> xmlschema.XMLSchema10: - return xmlschema.XMLSchema( - str( - pathlib.Path() - / "__iatikitcache__/standard/schemas/203/iati-activities-schema.xsd" +def get_xml_schema(filetype: str) -> xmlschema.XMLSchema10: + if filetype == "activity": + return xmlschema.XMLSchema( + str( + pathlib.Path() + / "__iatikitcache__/standard/schemas/203/iati-activities-schema.xsd" + ) + ) + else: + return xmlschema.XMLSchema( + str( + pathlib.Path() + / "__iatikitcache__/standard/schemas/203/iati-organisations-schema.xsd" + ) ) - ) def get_standard(refresh=False): @@ -238,7 +248,7 @@ def get_codelists_lookup(): ALL_CODELIST_LOOKUP[(path, codelist_value)] = value_name -def parse_activities_from_dataset( +def parse_dataset( dataset: iatikit.Dataset, ) -> Iterator[tuple[dict[str, Any], list[xmlschema.XMLSchemaValidationError]]]: try: @@ -247,29 +257,22 @@ def parse_activities_from_dataset( logger.debug(f"Error parsing XML for dataset '{dataset.name}'") return - transform = etree.XSLT(etree.parse(str(this_dir / "iati-activities.xsl"))) - schema = get_activities_schema() - - schema_dict = get_sorted_schema_dict() - - for activity in dataset_etree.findall("iati-activity"): + if dataset.filetype == "activity": + transform = etree.XSLT(etree.parse(str(this_dir / "iati-activities.xsl"))) version = dataset_etree.get("version", "1.01") - - activities = etree.Element("iati-activities", version=version) - activities.append(activity) - if version.startswith("1"): - activities = transform(activities).getroot() - - sort_iati_element(activities[0], schema_dict) + dataset_etree = transform(dataset_etree).getroot() + for child_element in dataset_etree.findall(f"iati-{dataset.filetype}"): + sort_iati_element(child_element, get_sorted_schema_dict()) xmlschema_to_dict_result: tuple[dict[str, Any], list[Any]] = xmlschema.to_dict( - activities, schema=schema, validation="lax", decimal_type=float # type: ignore + child_element, # type: ignore + schema=get_xml_schema(dataset.filetype), + validation="lax", + decimal_type=float, ) - activities_dict, error = xmlschema_to_dict_result - activity_dict = activities_dict.get("iati-activity", [{}])[0] - - yield activity_dict, error + parsed_dict, error = xmlschema_to_dict_result + yield parsed_dict, error def csv_file_to_db(csv_fd): @@ -282,9 +285,6 @@ def csv_file_to_db(csv_fd): def load_dataset(dataset: iatikit.Dataset) -> None: - if dataset.filetype != "activity": - return - if not dataset.data_path: logger.warn(f"Dataset '{dataset}' not found") return @@ -296,13 +296,13 @@ def load_dataset(dataset: iatikit.Dataset) -> None: connection.execute( insert( table( - "_all_activities", + f"_raw_{dataset.filetype}", column("prefix"), column("dataset"), column("filename"), column("error"), column("version"), - column("activity"), + column("object"), ) ).values( [ @@ -314,19 +314,19 @@ def load_dataset(dataset: iatikit.Dataset) -> None: [f"{error.reason} at {error.path}" for error in errors] ), "version": dataset.version, - "activity": json.dumps(activity), + "object": json.dumps(object), } - for activity, errors in parse_activities_from_dataset(dataset) + for object, errors in parse_dataset(dataset) ] ) ) def load(processes: int, sample: Optional[int] = None) -> None: - create_activities_table() + create_raw_tables() logger.info( - f"Loading {len(list(islice(iatikit.data().datasets, sample)))} datasets" + f"Loading {len(list(islice(iatikit.data().datasets, sample)))} datasets into database" ) datasets_sample = islice(iatikit.data().datasets, sample) @@ -335,7 +335,7 @@ def load(processes: int, sample: Optional[int] = None) -> None: executor.submit(load_dataset, dataset) for dataset in datasets_sample ] concurrent.futures.wait(futures) - logger.info("Finished loading registry data into database") + logger.info("Finished loading datasets into database") def process_registry() -> None: @@ -351,7 +351,7 @@ def process_registry() -> None: ) ) - activity_objects() + raw_objects() schema_analysis() postgres_tables() sql_process() @@ -385,7 +385,7 @@ def flatten_object(obj, current_path="", no_index_path=tuple()): @functools.lru_cache(1000) def path_info( - full_path: tuple[str | int, ...], no_index_path: tuple[str, ...] + full_path: tuple[str | int, ...], no_index_path: tuple[str, ...], filetype: str ) -> tuple[str, list[str], list[str], str, tuple[dict[str, str], ...]]: all_paths = [] for num, part in enumerate(full_path): @@ -403,7 +403,7 @@ def path_info( "_".join(str(key) for key in parent_path if not isinstance(key, int)) for parent_path in parent_paths ] - object_type = "_".join(str(key) for key in no_index_path) or "activity" + object_type = "_".join(str(key) for key in no_index_path) or filetype parent_keys = (dict(zip(parent_keys_no_index, parent_keys_list)),) return object_key, parent_keys_list, parent_keys_no_index, object_type, parent_keys @@ -458,42 +458,47 @@ def traverse_object( def create_rows( - id: int, dataset: str, prefix: str, activity: dict[str, Any] + id: int, dataset: str, prefix: str, original_object: dict[str, Any], filetype: str ) -> list[list[Any]]: rows = [] - if activity is None: + if original_object is None: return [] # get activity dates before traversal remove them - activity_dates = activity.get("activity-date", []) or [] + activity_dates = original_object.get("activity-date", []) or [] - for object, full_path, no_index_path in traverse_object(activity, True): + for object, full_path, no_index_path in traverse_object(original_object, True): ( object_key, parent_keys_list, parent_keys_no_index, object_type, parent_keys, - ) = path_info(full_path, no_index_path) + ) = path_info(full_path, no_index_path, filetype) object["_link"] = f'{id}{"." if object_key else ""}{object_key}' - object["_link_activity"] = str(id) object["dataset"] = dataset object["prefix"] = prefix - if object_type != "activity": - object["iatiidentifier"] = activity.get("iati-identifier") - reporting_org = activity.get("reporting-org", {}) or {} - object["reportingorg_ref"] = reporting_org.get("@ref") - - if object_type == "activity": - for activity_date in activity_dates: - if not isinstance(activity_date, dict): - continue - type = activity_date.get("@type") - date = activity_date.get("@iso-date") - if type and date and type in DATE_MAP: - object[DATE_MAP[type]] = date + + if filetype == "activity": + object["_link_activity"] = str(id) + if object_type == "activity": + for activity_date in activity_dates: + if not isinstance(activity_date, dict): + continue + type = activity_date.get("@type") + date = activity_date.get("@iso-date") + if type and date and type in DATE_MAP: + object[DATE_MAP[type]] = date + else: + object["iatiidentifier"] = original_object.get("iati-identifier") + reporting_org = original_object.get("reporting-org", {}) or {} + object["reportingorg_ref"] = reporting_org.get("@ref") + elif filetype == "organisation": + object["_link_organisation"] = str(id) + if object_type != "organisation": + object_type = f"organisation_{object_type}" for no_index, full in zip(parent_keys_no_index, parent_keys_list): object[f"_link_{no_index}"] = f"{id}.{full}" @@ -506,6 +511,7 @@ def create_rows( object=json.dumps( dict(flatten_object(object, no_index_path=no_index_path)) ), + filetype=filetype, ) rows.append(row) @@ -513,22 +519,23 @@ def create_rows( return result -def activity_objects() -> None: +def raw_objects() -> None: get_codelists_lookup() - logger.debug("Creating table: _activity_objects") + logger.debug("Creating table: _raw_objects") engine = get_engine() with engine.begin() as connection: connection.execute( text( """ - DROP TABLE IF EXISTS _activity_objects; - CREATE TABLE _activity_objects( + DROP TABLE IF EXISTS _raw_objects; + CREATE TABLE _raw_objects( id bigint, object_key TEXT, parent_keys JSONB, object_type TEXT, - object JSONB + object JSONB, + filetype TEXT ); """ ) @@ -539,30 +546,32 @@ def activity_objects() -> None: connection = connection.execution_options( stream_results=True, max_row_buffer=1000 ) - activity_count: Optional[Row] = connection.execute( - text("SELECT COUNT(*) FROM _all_activities") - ).first() - if activity_count: - logger.info( - f"Flattening {activity_count.count} activities and writing rows to CSV file" - ) - results = connection.execute( - text("SELECT id, dataset, prefix, activity FROM _all_activities") + text( + """ + (SELECT id, dataset, prefix, object, 'activity' AS filetype FROM _raw_activity ORDER BY id) + UNION ALL + (SELECT id, dataset, prefix, object, 'organisation' AS filetype FROM _raw_organisation ORDER BY id) + """ + ) ) paths_csv_file = tmpdirname + "/paths.csv" with gzip.open(paths_csv_file, "wt", newline="") as csv_file: csv_writer = csv.writer(csv_file) - for num, (id, dataset, prefix, activity) in enumerate(results): + for num, (id, dataset, prefix, original_object, filetype) in enumerate( + results + ): if num % 10000 == 0: - logger.info(f"Processed {num} activities so far") - csv_writer.writerows(create_rows(id, dataset, prefix, activity)) + logger.debug(f"Processed {num} objects so far") + csv_writer.writerows( + create_rows(id, dataset, prefix, original_object, filetype) + ) - logger.info("Loading processed activities from CSV file into database") + logger.debug("Loading processed activities from CSV file into database") with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f: dbapi_conn = connection.connection - copy_sql = "COPY _activity_objects FROM STDIN WITH CSV" + copy_sql = "COPY _raw_objects FROM STDIN WITH CSV" cur = dbapi_conn.cursor() cur.copy_expert(copy_sql, f) @@ -586,7 +595,7 @@ def schema_analysis(): END value_type, count(*) FROM - _activity_objects ro, jsonb_each(object) each + _raw_objects ro, jsonb_each(object) each GROUP BY 1,2,3; """, ) @@ -763,12 +772,12 @@ def postgres_tables(drop_release_objects=False): field_sql, as_sql = create_field_sql(object_detail) table_sql = f""" SELECT {field_sql} - FROM _activity_objects, jsonb_to_record(object) AS x({as_sql}) + FROM _raw_objects, jsonb_to_record(object) AS x({as_sql}) WHERE object_type = :object_type """ create_table(object_type, table_sql, object_type=object_type) - logger.info("Creating table: metadata") + logger.debug("Creating table: metadata") with get_engine().begin() as connection: connection.execute( text( @@ -794,7 +803,7 @@ def postgres_tables(drop_release_objects=False): def augment_transaction(): - logger.info("Augmenting transaction table") + logger.debug("Augmenting transaction table") with get_engine().begin() as connection: connection.execute( text( @@ -929,7 +938,7 @@ def augment_transaction(): def transaction_breakdown(): - logger.info("Creating transaction_breakdown table") + logger.debug("Creating transaction_breakdown table") with get_engine().begin() as connection: connection.execute( text( diff --git a/iatidata/tests/test_iatidata.py b/iatidata/tests/test_iatidata.py index b4146a2..fbcc611 100644 --- a/iatidata/tests/test_iatidata.py +++ b/iatidata/tests/test_iatidata.py @@ -180,7 +180,7 @@ def test_path_info(): parent_keys_no_index, object_type, parent_keys, - ) = path_info(full_path, no_index_path) + ) = path_info(full_path, no_index_path, "activity") assert object_key == "result.12.indicator.3.period.0.actual.0" assert parent_keys_list == [