diff --git a/iatidata/__init__.py b/iatidata/__init__.py index b2d6677..3044b43 100644 --- a/iatidata/__init__.py +++ b/iatidata/__init__.py @@ -5,12 +5,14 @@ import functools import gzip import json +import logging import os import pathlib import re import shutil import subprocess import tempfile +import time import zipfile from collections import defaultdict from io import StringIO @@ -29,6 +31,14 @@ from iatidata import sort_iati +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", +) +logging.Formatter.converter = time.gmtime +logger = logging.getLogger(__name__) + this_dir = pathlib.Path(__file__).parent.resolve() output_dir = os.environ.get("IATI_TABLES_OUTPUT", ".") @@ -71,7 +81,7 @@ def create_table(table, sql, **params): def create_activites_table(): - print("creating activities table") + logger.info("Creating activities table") engine = get_engine() with engine.begin() as connection: connection.execute( @@ -86,13 +96,13 @@ def create_activites_table(): def get_standard(refresh=False): if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh: - print("getting standard") + logger.info("Getting standard") iatikit.download.standard() def get_registry(refresh=False): if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh: - print("getting regisitry data") + logger.info("Getting regisitry data") iatikit.download.data() return iatikit.data() @@ -256,19 +266,19 @@ def save_part(data): bucket_num, datasets = data with tempfile.TemporaryDirectory() as tmpdirname: - print(bucket_num, tmpdirname) + logger.debug(f"{bucket_num}, {tmpdirname}") with gzip.open(f"{tmpdirname}/out.csv.gz", "wt", newline="") as f: csv_file = csv.writer(f) for num, dataset in enumerate(datasets): if num % 100 == 0: - print(bucket_num, num, flush=True) + logger.debug(f"{bucket_num}, {num}") if dataset.filetype != "activity": continue if not dataset.data_path: - print(dataset, "not found") + logger.warn(f"Dataset '{dataset}' not found") continue path = pathlib.Path(dataset.data_path) @@ -277,7 +287,7 @@ def save_part(data): try: root = dataset.etree.getroot() except Exception as e: - print("Error parsing XML", e) + logger.error("Error parsing XML", e) continue save_converted_xml_to_csv(root, csv_file, prefix, filename) @@ -300,10 +310,10 @@ def save_all(parts=5, sample=None, refresh=False): if sample and num > sample: break - print("Loading registry data into database") + logger.info("Loading registry data into database...") with concurrent.futures.ProcessPoolExecutor() as executor: for job in executor.map(save_part, buckets.items()): - print(f"DONE {job}") + logger.info(f"Finished loading part {job}") continue @@ -330,7 +340,7 @@ def process_activities(activities, name): get_standard() with tempfile.TemporaryDirectory() as tmpdirname: - print("converting to json") + logger.info("Converting to json") with gzip.open(f"{tmpdirname}/out.csv.gz", "wt", newline="") as f: csv_file = csv.writer(f) save_converted_xml_to_csv(activities, csv_file, name, name) @@ -496,7 +506,7 @@ def create_rows(result): def activity_objects(): - print("generating activity_objects") + logger.info("Generating activity_objects") get_codelists_lookup() @@ -519,15 +529,15 @@ def activity_objects(): ) paths_csv_file = tmpdirname + "/paths.csv" - print("Making CSV file") + logger.info("Making CSV file") with gzip.open(paths_csv_file, "wt", newline="") as csv_file: csv_writer = csv.writer(csv_file) for num, result in enumerate(results): if num % 10000 == 0: - print(str(datetime.datetime.utcnow()), num) + logger.info(f"Written {num} rows") csv_writer.writerows(create_rows(result)) - print("Uploading Data") + logger.info("Uploading Data") with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f: dbapi_conn = connection.connection copy_sql = "COPY _activity_objects FROM STDIN WITH CSV" @@ -539,7 +549,7 @@ def activity_objects(): def schema_analysis(): - print("Creating tables '_fields' and '_tables'") + logger.info("Creating tables '_fields' and '_tables'") create_table( "_object_type_aggregate", f"""SELECT @@ -672,7 +682,7 @@ def create_field_sql(object_details, sqlite=False): def postgres_tables(drop_release_objects=False): - print("making postgres tables") + logger.info("Making postgres tables") object_details = defaultdict(list) with get_engine().begin() as connection: result = list( @@ -1093,7 +1103,7 @@ def export_sqlite(): f',FOREIGN KEY("{name}") REFERENCES "{foreign_table}"(_link)' ) - print(f"importing table {object_type}") + logger.info(f"Importing table {object_type}") with open(f"{tmpdirname}/{object_type}.csv", "wb") as out: dbapi_conn = connection.connection copy_sql = f'COPY "{object_type.lower()}" TO STDOUT WITH (FORMAT CSV, FORCE_QUOTE *)' @@ -1107,7 +1117,7 @@ def export_sqlite(): CREATE TABLE "{target_object_type}" (prefix, {field_def} {' '.join(fks)}) ; .import '{tmpdirname}/{object_type}.csv' "{target_object_type}" """ - print(import_sql) + logger.debug(import_sql) subprocess.run( ["sqlite3", f"{sqlite_file}"], @@ -1252,7 +1262,7 @@ def export_bigquery(): ) for object_type, object_details in object_details.items(): - print(f"loading {object_type}") + logger.info(f"Loading {object_type}") result = connection.execute( sa.text( f'SELECT to_jsonb("{object_type.lower()}") AS object FROM "{object_type.lower()}"' @@ -1310,7 +1320,7 @@ def export_all(): try: export_bigquery() except Exception: - print("Big query failed, proceeding anyway") + logger.warn("Big query failed, proceeding anyway") def upload_all():