Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging #28

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions iatidata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import functools
import gzip
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import tempfile
import time
import zipfile
from collections import defaultdict
from io import StringIO
Expand All @@ -29,6 +31,14 @@

from iatidata import sort_iati

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logging.Formatter.converter = time.gmtime
logger = logging.getLogger(__name__)

this_dir = pathlib.Path(__file__).parent.resolve()

output_dir = os.environ.get("IATI_TABLES_OUTPUT", ".")
Expand Down Expand Up @@ -71,7 +81,7 @@ def create_table(table, sql, **params):


def create_activites_table():
print("creating activities table")
logger.info("Creating activities table")
engine = get_engine()
with engine.begin() as connection:
connection.execute(
Expand All @@ -86,13 +96,13 @@ def create_activites_table():

def get_standard(refresh=False):
if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh:
print("getting standard")
logger.info("Getting standard")
iatikit.download.standard()


def get_registry(refresh=False):
if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh:
print("getting regisitry data")
logger.info("Getting regisitry data")
iatikit.download.data()

return iatikit.data()
Expand Down Expand Up @@ -256,19 +266,19 @@ def save_part(data):
bucket_num, datasets = data

with tempfile.TemporaryDirectory() as tmpdirname:
print(bucket_num, tmpdirname)
logger.debug(f"{bucket_num}, {tmpdirname}")
with gzip.open(f"{tmpdirname}/out.csv.gz", "wt", newline="") as f:
csv_file = csv.writer(f)

for num, dataset in enumerate(datasets):
if num % 100 == 0:
print(bucket_num, num, flush=True)
logger.debug(f"{bucket_num}, {num}")

if dataset.filetype != "activity":
continue

if not dataset.data_path:
print(dataset, "not found")
logger.warn(f"Dataset '{dataset}' not found")
continue

path = pathlib.Path(dataset.data_path)
Expand All @@ -277,7 +287,7 @@ def save_part(data):
try:
root = dataset.etree.getroot()
except Exception as e:
print("Error parsing XML", e)
logger.error("Error parsing XML", e)
continue

save_converted_xml_to_csv(root, csv_file, prefix, filename)
Expand All @@ -300,10 +310,10 @@ def save_all(parts=5, sample=None, refresh=False):
if sample and num > sample:
break

print("Loading registry data into database")
logger.info("Loading registry data into database...")
with concurrent.futures.ProcessPoolExecutor() as executor:
for job in executor.map(save_part, buckets.items()):
print(f"DONE {job}")
logger.info(f"Finished loading part {job}")
continue


Expand All @@ -330,7 +340,7 @@ def process_activities(activities, name):
get_standard()

with tempfile.TemporaryDirectory() as tmpdirname:
print("converting to json")
logger.info("Converting to json")
with gzip.open(f"{tmpdirname}/out.csv.gz", "wt", newline="") as f:
csv_file = csv.writer(f)
save_converted_xml_to_csv(activities, csv_file, name, name)
Expand Down Expand Up @@ -496,7 +506,7 @@ def create_rows(result):


def activity_objects():
print("generating activity_objects")
logger.info("Generating activity_objects")

get_codelists_lookup()

Expand All @@ -519,15 +529,15 @@ def activity_objects():
)
paths_csv_file = tmpdirname + "/paths.csv"

print("Making CSV file")
logger.info("Making CSV file")
with gzip.open(paths_csv_file, "wt", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
for num, result in enumerate(results):
if num % 10000 == 0:
print(str(datetime.datetime.utcnow()), num)
logger.info(f"Written {num} rows")
csv_writer.writerows(create_rows(result))

print("Uploading Data")
logger.info("Uploading Data")
with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f:
dbapi_conn = connection.connection
copy_sql = "COPY _activity_objects FROM STDIN WITH CSV"
Expand All @@ -539,7 +549,7 @@ def activity_objects():


def schema_analysis():
print("Creating tables '_fields' and '_tables'")
logger.info("Creating tables '_fields' and '_tables'")
create_table(
"_object_type_aggregate",
f"""SELECT
Expand Down Expand Up @@ -672,7 +682,7 @@ def create_field_sql(object_details, sqlite=False):


def postgres_tables(drop_release_objects=False):
print("making postgres tables")
logger.info("Making postgres tables")
object_details = defaultdict(list)
with get_engine().begin() as connection:
result = list(
Expand Down Expand Up @@ -1093,7 +1103,7 @@ def export_sqlite():
f',FOREIGN KEY("{name}") REFERENCES "{foreign_table}"(_link)'
)

print(f"importing table {object_type}")
logger.info(f"Importing table {object_type}")
with open(f"{tmpdirname}/{object_type}.csv", "wb") as out:
dbapi_conn = connection.connection
copy_sql = f'COPY "{object_type.lower()}" TO STDOUT WITH (FORMAT CSV, FORCE_QUOTE *)'
Expand All @@ -1107,7 +1117,7 @@ def export_sqlite():
CREATE TABLE "{target_object_type}" (prefix, {field_def} {' '.join(fks)}) ;
.import '{tmpdirname}/{object_type}.csv' "{target_object_type}" """

print(import_sql)
logger.debug(import_sql)

subprocess.run(
["sqlite3", f"{sqlite_file}"],
Expand Down Expand Up @@ -1252,7 +1262,7 @@ def export_bigquery():
)

for object_type, object_details in object_details.items():
print(f"loading {object_type}")
logger.info(f"Loading {object_type}")
result = connection.execute(
sa.text(
f'SELECT to_jsonb("{object_type.lower()}") AS object FROM "{object_type.lower()}"'
Expand Down Expand Up @@ -1310,7 +1320,7 @@ def export_all():
try:
export_bigquery()
except Exception:
print("Big query failed, proceeding anyway")
logger.warn("Big query failed, proceeding anyway")


def upload_all():
Expand Down