Skip to content

Commit

Permalink
logging: Make logging more useful for iatidata
Browse files Browse the repository at this point in the history
  • Loading branch information
tillywoodfield committed Feb 14, 2024
1 parent e49f279 commit 9aae156
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions iatidata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import functools
import gzip
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import tempfile
import time
import zipfile
from collections import defaultdict
from io import StringIO
Expand All @@ -29,6 +31,14 @@

from iatidata import sort_iati

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logging.Formatter.converter = time.gmtime
logger = logging.getLogger(__name__)

this_dir = pathlib.Path(__file__).parent.resolve()

output_dir = os.environ.get("IATI_TABLES_OUTPUT", ".")
Expand Down Expand Up @@ -71,7 +81,7 @@ def create_table(table, sql, **params):


def create_activites_table():
print("creating activities table")
logger.info("Creating activities table")
engine = get_engine()
with engine.begin() as connection:
connection.execute(
Expand All @@ -86,13 +96,13 @@ def create_activites_table():

def get_standard(refresh=False):
if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh:
print("getting standard")
logger.info("Getting standard")
iatikit.download.standard()


def get_registry(refresh=False):
if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh:
print("getting regisitry data")
logger.info("Getting regisitry data")
iatikit.download.data()

return iatikit.data()
Expand Down Expand Up @@ -256,19 +266,19 @@ def save_part(data):
bucket_num, datasets = data

with tempfile.TemporaryDirectory() as tmpdirname:
print(bucket_num, tmpdirname)
logger.debug(f"{bucket_num}, {tmpdirname}")
with gzip.open(f"{tmpdirname}/out.csv.gz", "wt", newline="") as f:
csv_file = csv.writer(f)

for num, dataset in enumerate(datasets):
if num % 100 == 0:
print(bucket_num, num, flush=True)
logger.debug(f"{bucket_num}, {num}")

if dataset.filetype != "activity":
continue

if not dataset.data_path:
print(dataset, "not found")
logger.warn(f"Dataset '{dataset}' not found")
continue

path = pathlib.Path(dataset.data_path)
Expand All @@ -277,7 +287,7 @@ def save_part(data):
try:
root = dataset.etree.getroot()
except Exception as e:
print("Error parsing XML", e)
logger.error("Error parsing XML", e)
continue

save_converted_xml_to_csv(root, csv_file, prefix, filename)
Expand All @@ -300,10 +310,10 @@ def save_all(parts=5, sample=None, refresh=False):
if sample and num > sample:
break

print("Loading registry data into database")
logger.info("Loading registry data into database...")
with concurrent.futures.ProcessPoolExecutor() as executor:
for job in executor.map(save_part, buckets.items()):
print(f"DONE {job}")
logger.info(f"Finished loading part {job}")
continue


Expand All @@ -330,7 +340,7 @@ def process_activities(activities, name):
get_standard()

with tempfile.TemporaryDirectory() as tmpdirname:
print("converting to json")
logger.info("Converting to json")
with gzip.open(f"{tmpdirname}/out.csv.gz", "wt", newline="") as f:
csv_file = csv.writer(f)
save_converted_xml_to_csv(activities, csv_file, name, name)
Expand Down Expand Up @@ -496,7 +506,7 @@ def create_rows(result):


def activity_objects():
print("generating activity_objects")
logger.info("Generating activity_objects")

get_codelists_lookup()

Expand All @@ -519,15 +529,15 @@ def activity_objects():
)
paths_csv_file = tmpdirname + "/paths.csv"

print("Making CSV file")
logger.info("Making CSV file")
with gzip.open(paths_csv_file, "wt", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
for num, result in enumerate(results):
if num % 10000 == 0:
print(str(datetime.datetime.utcnow()), num)
logger.info(f"Written {num} rows")
csv_writer.writerows(create_rows(result))

print("Uploading Data")
logger.info("Uploading Data")
with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f:
dbapi_conn = connection.connection
copy_sql = "COPY _activity_objects FROM STDIN WITH CSV"
Expand All @@ -539,7 +549,7 @@ def activity_objects():


def schema_analysis():
print("Creating tables '_fields' and '_tables'")
logger.info("Creating tables '_fields' and '_tables'")
create_table(
"_object_type_aggregate",
f"""SELECT
Expand Down Expand Up @@ -672,7 +682,7 @@ def create_field_sql(object_details, sqlite=False):


def postgres_tables(drop_release_objects=False):
print("making postgres tables")
logger.info("Making postgres tables")
object_details = defaultdict(list)
with get_engine().begin() as connection:
result = list(
Expand Down Expand Up @@ -1093,7 +1103,7 @@ def export_sqlite():
f',FOREIGN KEY("{name}") REFERENCES "{foreign_table}"(_link)'
)

print(f"importing table {object_type}")
logger.info(f"Importing table {object_type}")
with open(f"{tmpdirname}/{object_type}.csv", "wb") as out:
dbapi_conn = connection.connection
copy_sql = f'COPY "{object_type.lower()}" TO STDOUT WITH (FORMAT CSV, FORCE_QUOTE *)'
Expand All @@ -1107,7 +1117,7 @@ def export_sqlite():
CREATE TABLE "{target_object_type}" (prefix, {field_def} {' '.join(fks)}) ;
.import '{tmpdirname}/{object_type}.csv' "{target_object_type}" """

print(import_sql)
logger.debug(import_sql)

subprocess.run(
["sqlite3", f"{sqlite_file}"],
Expand Down Expand Up @@ -1252,7 +1262,7 @@ def export_bigquery():
)

for object_type, object_details in object_details.items():
print(f"loading {object_type}")
logger.info(f"Loading {object_type}")
result = connection.execute(
sa.text(
f'SELECT to_jsonb("{object_type.lower()}") AS object FROM "{object_type.lower()}"'
Expand Down Expand Up @@ -1310,7 +1320,7 @@ def export_all():
try:
export_bigquery()
except Exception:
print("Big query failed, proceeding anyway")
logger.warn("Big query failed, proceeding anyway")


def upload_all():
Expand Down

0 comments on commit 9aae156

Please sign in to comment.