From 1137a036a9693a03a805510008775c28413e2e1f Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:11:55 +0300 Subject: [PATCH] Remove temporary CSV file from processing step --- iatidata/__init__.py | 69 ++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/iatidata/__init__.py b/iatidata/__init__.py index 3a50fb8..f8d7e1c 100644 --- a/iatidata/__init__.py +++ b/iatidata/__init__.py @@ -1,6 +1,5 @@ import base64 import concurrent.futures -import csv import functools import gzip import json @@ -490,9 +489,7 @@ def traverse_object( def create_rows( id: int, dataset: str, prefix: str, original_object: dict[str, Any], filetype: str -) -> list[list[Any]]: - rows = [] - +) -> Iterator[dict[str, Any]]: if original_object is None: return [] @@ -534,7 +531,7 @@ def create_rows( for no_index, full in zip(parent_keys_no_index, parent_keys_list): object[f"_link_{no_index}"] = f"{id}.{full}" - row = dict( + yield dict( id=id, object_key=object_key, parent_keys=json.dumps(parent_keys), @@ -544,10 +541,6 @@ def create_rows( ), filetype=filetype, ) - rows.append(row) - - result = [list(row.values()) for row in rows] - return result def raw_objects() -> None: @@ -573,12 +566,12 @@ def raw_objects() -> None: ) ) - with tempfile.TemporaryDirectory() as tmpdirname: - with engine.begin() as connection: - connection = connection.execution_options( + with engine.begin() as read_connection: + with engine.begin() as write_connection: + read_connection = read_connection.execution_options( stream_results=True, max_row_buffer=1000 ) - results = connection.execute( + results = read_connection.execute( text( """ (SELECT id, dataset, prefix, object, 'activity' AS filetype FROM _raw_activity ORDER BY id) @@ -587,25 +580,39 @@ def raw_objects() -> None: """ ) ) - paths_csv_file = tmpdirname + "/paths.csv" - - with gzip.open(paths_csv_file, "wt", newline="") as csv_file: - csv_writer = csv.writer(csv_file) - for num, (id, dataset, prefix, original_object, filetype) in enumerate( - results - ): - if num % 10000 == 0: - logger.debug(f"Processed {num} objects so far") - csv_writer.writerows( - create_rows(id, dataset, prefix, original_object, filetype) - ) - logger.debug("Loading processed activities from CSV file into database") - with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f: - dbapi_conn = connection.connection - copy_sql = "COPY _raw_objects FROM STDIN WITH CSV" - cur = dbapi_conn.cursor() - cur.copy_expert(copy_sql, f) + for num, (id, dataset, prefix, original_object, filetype) in enumerate( + results + ): + if num % 10000 == 0: + logger.info(f"Processed {num} objects so far") + write_connection.execute( + insert( + table( + "_raw_objects", + column("id"), + column("object_key"), + column("parent_keys"), + column("object_type"), + column("object"), + column("filetype"), + ) + ).values( + [ + { + "id": row["id"], + "object_key": row["object_key"], + "parent_keys": row["parent_keys"], + "object_type": row["object_type"], + "object": row["object"], + "filetype": row["filetype"], + } + for row in create_rows( + id, dataset, prefix, original_object, filetype + ) + ] + ) + ) DATE_RE = r"^(\d{4})-(\d{2})-(\d{2})([T ](\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)((-(\d{2}):(\d{2})|Z)?))?$"