Skip to content

Commit

Permalink
Include organisation files
Browse files Browse the repository at this point in the history
  • Loading branch information
tillywoodfield committed Jul 17, 2024
1 parent 912e0b9 commit d257ad3
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 94 deletions.
195 changes: 102 additions & 93 deletions iatidata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from google.cloud.bigquery.dataset import AccessEntry
from google.oauth2 import service_account
from lxml import etree
from sqlalchemy import Engine, Row, column, create_engine, insert, table, text
from sqlalchemy import Engine, column, create_engine, insert, table, text

from iatidata import sort_iati

Expand Down Expand Up @@ -82,30 +82,40 @@ def create_table(table, sql, **params):
_create_table(table.lower(), con, sql, **params)


def create_activities_table():
logger.debug("Creating table: _all_activities")
def create_raw_tables():
engine = get_engine()
with engine.begin() as connection:
connection.execute(
text(
"""
DROP TABLE IF EXISTS _all_activities;
CREATE TABLE _all_activities(
id SERIAL, prefix TEXT, dataset TEXT, filename TEXT, error TEXT, version TEXT, activity JSONB
);
"""
for filetype in ["activity", "organisation"]:
table_name = f"_raw_{filetype}"
logger.debug(f"Creating table: {table_name}")
connection.execute(
text(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}(
id SERIAL, prefix TEXT, dataset TEXT, filename TEXT, error TEXT, version TEXT, object JSONB
);
"""
)
)
)


@functools.lru_cache
def get_activities_schema() -> xmlschema.XMLSchema10:
return xmlschema.XMLSchema(
str(
pathlib.Path()
/ "__iatikitcache__/standard/schemas/203/iati-activities-schema.xsd"
def get_xml_schema(filetype: str) -> xmlschema.XMLSchema10:
if filetype == "activity":
return xmlschema.XMLSchema(
str(
pathlib.Path()
/ "__iatikitcache__/standard/schemas/203/iati-activities-schema.xsd"
)
)
else:
return xmlschema.XMLSchema(
str(
pathlib.Path()
/ "__iatikitcache__/standard/schemas/203/iati-organisations-schema.xsd"
)
)
)


def get_standard(refresh=False):
Expand Down Expand Up @@ -238,7 +248,7 @@ def get_codelists_lookup():
ALL_CODELIST_LOOKUP[(path, codelist_value)] = value_name


def parse_activities_from_dataset(
def parse_dataset(
dataset: iatikit.Dataset,
) -> Iterator[tuple[dict[str, Any], list[xmlschema.XMLSchemaValidationError]]]:
try:
Expand All @@ -247,29 +257,22 @@ def parse_activities_from_dataset(
logger.debug(f"Error parsing XML for dataset '{dataset.name}'")
return

transform = etree.XSLT(etree.parse(str(this_dir / "iati-activities.xsl")))
schema = get_activities_schema()

schema_dict = get_sorted_schema_dict()

for activity in dataset_etree.findall("iati-activity"):
if dataset.filetype == "activity":
transform = etree.XSLT(etree.parse(str(this_dir / "iati-activities.xsl")))
version = dataset_etree.get("version", "1.01")

activities = etree.Element("iati-activities", version=version)
activities.append(activity)

if version.startswith("1"):
activities = transform(activities).getroot()

sort_iati_element(activities[0], schema_dict)
dataset_etree = transform(dataset_etree).getroot()

for child_element in dataset_etree.findall(f"iati-{dataset.filetype}"):
sort_iati_element(child_element, get_sorted_schema_dict())
xmlschema_to_dict_result: tuple[dict[str, Any], list[Any]] = xmlschema.to_dict(
activities, schema=schema, validation="lax", decimal_type=float # type: ignore
child_element, # type: ignore
schema=get_xml_schema(dataset.filetype),
validation="lax",
decimal_type=float,
)
activities_dict, error = xmlschema_to_dict_result
activity_dict = activities_dict.get("iati-activity", [{}])[0]

yield activity_dict, error
parsed_dict, error = xmlschema_to_dict_result
yield parsed_dict, error


def csv_file_to_db(csv_fd):
Expand All @@ -282,9 +285,6 @@ def csv_file_to_db(csv_fd):


def load_dataset(dataset: iatikit.Dataset) -> None:
if dataset.filetype != "activity":
return

if not dataset.data_path:
logger.warn(f"Dataset '{dataset}' not found")
return
Expand All @@ -296,13 +296,13 @@ def load_dataset(dataset: iatikit.Dataset) -> None:
connection.execute(
insert(
table(
"_all_activities",
f"_raw_{dataset.filetype}",
column("prefix"),
column("dataset"),
column("filename"),
column("error"),
column("version"),
column("activity"),
column("object"),
)
).values(
[
Expand All @@ -314,19 +314,19 @@ def load_dataset(dataset: iatikit.Dataset) -> None:
[f"{error.reason} at {error.path}" for error in errors]
),
"version": dataset.version,
"activity": json.dumps(activity),
"object": json.dumps(object),
}
for activity, errors in parse_activities_from_dataset(dataset)
for object, errors in parse_dataset(dataset)
]
)
)


def load(processes: int, sample: Optional[int] = None) -> None:
create_activities_table()
create_raw_tables()

logger.info(
f"Loading {len(list(islice(iatikit.data().datasets, sample)))} datasets"
f"Loading {len(list(islice(iatikit.data().datasets, sample)))} datasets into database"
)
datasets_sample = islice(iatikit.data().datasets, sample)

Expand All @@ -335,7 +335,7 @@ def load(processes: int, sample: Optional[int] = None) -> None:
executor.submit(load_dataset, dataset) for dataset in datasets_sample
]
concurrent.futures.wait(futures)
logger.info("Finished loading registry data into database")
logger.info("Finished loading datasets into database")


def process_registry() -> None:
Expand All @@ -351,7 +351,7 @@ def process_registry() -> None:
)
)

activity_objects()
raw_objects()
schema_analysis()
postgres_tables()
sql_process()
Expand Down Expand Up @@ -385,7 +385,7 @@ def flatten_object(obj, current_path="", no_index_path=tuple()):

@functools.lru_cache(1000)
def path_info(
full_path: tuple[str | int, ...], no_index_path: tuple[str, ...]
full_path: tuple[str | int, ...], no_index_path: tuple[str, ...], filetype: str
) -> tuple[str, list[str], list[str], str, tuple[dict[str, str], ...]]:
all_paths = []
for num, part in enumerate(full_path):
Expand All @@ -403,7 +403,7 @@ def path_info(
"_".join(str(key) for key in parent_path if not isinstance(key, int))
for parent_path in parent_paths
]
object_type = "_".join(str(key) for key in no_index_path) or "activity"
object_type = "_".join(str(key) for key in no_index_path) or filetype
parent_keys = (dict(zip(parent_keys_no_index, parent_keys_list)),)
return object_key, parent_keys_list, parent_keys_no_index, object_type, parent_keys

Expand Down Expand Up @@ -458,42 +458,47 @@ def traverse_object(


def create_rows(
id: int, dataset: str, prefix: str, activity: dict[str, Any]
id: int, dataset: str, prefix: str, original_object: dict[str, Any], filetype: str
) -> list[list[Any]]:
rows = []

if activity is None:
if original_object is None:
return []

# get activity dates before traversal remove them
activity_dates = activity.get("activity-date", []) or []
activity_dates = original_object.get("activity-date", []) or []

for object, full_path, no_index_path in traverse_object(activity, True):
for object, full_path, no_index_path in traverse_object(original_object, True):
(
object_key,
parent_keys_list,
parent_keys_no_index,
object_type,
parent_keys,
) = path_info(full_path, no_index_path)
) = path_info(full_path, no_index_path, filetype)

object["_link"] = f'{id}{"." if object_key else ""}{object_key}'
object["_link_activity"] = str(id)
object["dataset"] = dataset
object["prefix"] = prefix
if object_type != "activity":
object["iatiidentifier"] = activity.get("iati-identifier")
reporting_org = activity.get("reporting-org", {}) or {}
object["reportingorg_ref"] = reporting_org.get("@ref")

if object_type == "activity":
for activity_date in activity_dates:
if not isinstance(activity_date, dict):
continue
type = activity_date.get("@type")
date = activity_date.get("@iso-date")
if type and date and type in DATE_MAP:
object[DATE_MAP[type]] = date

if filetype == "activity":
object["_link_activity"] = str(id)
if object_type == "activity":
for activity_date in activity_dates:
if not isinstance(activity_date, dict):
continue
type = activity_date.get("@type")
date = activity_date.get("@iso-date")
if type and date and type in DATE_MAP:
object[DATE_MAP[type]] = date
else:
object["iatiidentifier"] = original_object.get("iati-identifier")
reporting_org = original_object.get("reporting-org", {}) or {}
object["reportingorg_ref"] = reporting_org.get("@ref")
elif filetype == "organisation":
object["_link_organisation"] = str(id)
if object_type != "organisation":
object_type = f"organisation_{object_type}"

for no_index, full in zip(parent_keys_no_index, parent_keys_list):
object[f"_link_{no_index}"] = f"{id}.{full}"
Expand All @@ -506,29 +511,31 @@ def create_rows(
object=json.dumps(
dict(flatten_object(object, no_index_path=no_index_path))
),
filetype=filetype,
)
rows.append(row)

result = [list(row.values()) for row in rows]
return result


def activity_objects() -> None:
def raw_objects() -> None:
get_codelists_lookup()

logger.debug("Creating table: _activity_objects")
logger.debug("Creating table: _raw_objects")
engine = get_engine()
with engine.begin() as connection:
connection.execute(
text(
"""
DROP TABLE IF EXISTS _activity_objects;
CREATE TABLE _activity_objects(
DROP TABLE IF EXISTS _raw_objects;
CREATE TABLE _raw_objects(
id bigint,
object_key TEXT,
parent_keys JSONB,
object_type TEXT,
object JSONB
object JSONB,
filetype TEXT
);
"""
)
Expand All @@ -539,30 +546,32 @@ def activity_objects() -> None:
connection = connection.execution_options(
stream_results=True, max_row_buffer=1000
)
activity_count: Optional[Row] = connection.execute(
text("SELECT COUNT(*) FROM _all_activities")
).first()
if activity_count:
logger.info(
f"Flattening {activity_count.count} activities and writing rows to CSV file"
)

results = connection.execute(
text("SELECT id, dataset, prefix, activity FROM _all_activities")
text(
"""
(SELECT id, dataset, prefix, object, 'activity' AS filetype FROM _raw_activity ORDER BY id)
UNION ALL
(SELECT id, dataset, prefix, object, 'organisation' AS filetype FROM _raw_organisation ORDER BY id)
"""
)
)
paths_csv_file = tmpdirname + "/paths.csv"

with gzip.open(paths_csv_file, "wt", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
for num, (id, dataset, prefix, activity) in enumerate(results):
for num, (id, dataset, prefix, original_object, filetype) in enumerate(
results
):
if num % 10000 == 0:
logger.info(f"Processed {num} activities so far")
csv_writer.writerows(create_rows(id, dataset, prefix, activity))
logger.debug(f"Processed {num} objects so far")
csv_writer.writerows(
create_rows(id, dataset, prefix, original_object, filetype)
)

logger.info("Loading processed activities from CSV file into database")
logger.debug("Loading processed activities from CSV file into database")
with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f:
dbapi_conn = connection.connection
copy_sql = "COPY _activity_objects FROM STDIN WITH CSV"
copy_sql = "COPY _raw_objects FROM STDIN WITH CSV"
cur = dbapi_conn.cursor()
cur.copy_expert(copy_sql, f)

Expand All @@ -586,7 +595,7 @@ def schema_analysis():
END value_type,
count(*)
FROM
_activity_objects ro, jsonb_each(object) each
_raw_objects ro, jsonb_each(object) each
GROUP BY 1,2,3;
""",
)
Expand Down Expand Up @@ -763,12 +772,12 @@ def postgres_tables(drop_release_objects=False):
field_sql, as_sql = create_field_sql(object_detail)
table_sql = f"""
SELECT {field_sql}
FROM _activity_objects, jsonb_to_record(object) AS x({as_sql})
FROM _raw_objects, jsonb_to_record(object) AS x({as_sql})
WHERE object_type = :object_type
"""
create_table(object_type, table_sql, object_type=object_type)

logger.info("Creating table: metadata")
logger.debug("Creating table: metadata")
with get_engine().begin() as connection:
connection.execute(
text(
Expand All @@ -794,7 +803,7 @@ def postgres_tables(drop_release_objects=False):


def augment_transaction():
logger.info("Augmenting transaction table")
logger.debug("Augmenting transaction table")
with get_engine().begin() as connection:
connection.execute(
text(
Expand Down Expand Up @@ -929,7 +938,7 @@ def augment_transaction():


def transaction_breakdown():
logger.info("Creating transaction_breakdown table")
logger.debug("Creating transaction_breakdown table")
with get_engine().begin() as connection:
connection.execute(
text(
Expand Down
Loading

0 comments on commit d257ad3

Please sign in to comment.