diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..076b624 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,19 @@ +name: Lint +on: [push, pull_request] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: Install requirements_dev.txt + run: pip install -r requirements_dev.txt + - name: Check isort + run: isort --check-only iatidata/ + - name: Check black + run: black --check iatidata/ + - name: Check flake8 + run: flake8 iatidata/ diff --git a/README.md b/README.md index 1e74c2c..264dedd 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,15 @@ Running the tests: python -m pytest iatidata/ ``` +Linting: + +``` +isort iatidata +black iatidata +flake8 iatidata +``` + + ### Web front-end Install Node JS 20. e.g. on Ubuntu: diff --git a/iatidata/__init__.py b/iatidata/__init__.py index e7ddb97..4d1fdf3 100644 --- a/iatidata/__init__.py +++ b/iatidata/__init__.py @@ -1,37 +1,36 @@ -import xmlschema +import base64 +import concurrent.futures +import csv +import datetime +import functools +import gzip import json import os -import sqlalchemy as sa -import tempfile -import functools -import datetime +import pathlib +import re import shutil -import base64 import subprocess +import tempfile import zipfile -from textwrap import dedent +from collections import defaultdict from io import StringIO -import re +from textwrap import dedent -import requests import iatikit -import pathlib -from collections import defaultdict -from lxml import etree +import requests +import sqlalchemy as sa +import xmlschema +from fastavro import parse_schema, writer from google.cloud import bigquery from google.cloud.bigquery.dataset import AccessEntry from google.oauth2 import service_account -from fastavro import parse_schema, writer - -import concurrent.futures -import csv -import gzip +from lxml import etree from iatidata import sort_iati this_dir = pathlib.Path(__file__).parent.resolve() -output_dir = os.environ.get("IATI_TABLES_OUTPUT", '.') +output_dir = os.environ.get("IATI_TABLES_OUTPUT", ".") schema = os.environ.get("IATI_TABLES_SCHEMA") @@ -75,8 +74,11 @@ def create_activites_table(): engine = get_engine() with engine.begin() as connection: connection.execute( - """DROP TABLE IF EXISTS _all_activities; - CREATE TABLE _all_activities(id SERIAL, prefix TEXT, filename TEXT, error TEXT, version TEXT, activity JSONB); + """ + DROP TABLE IF EXISTS _all_activities; + CREATE TABLE _all_activities( + id SERIAL, prefix TEXT, filename TEXT, error TEXT, version TEXT, activity JSONB + ); """ ) @@ -88,7 +90,6 @@ def get_standard(refresh=False): def get_registry(refresh=False): - if not (pathlib.Path() / "__iatikitcache__").is_dir() or refresh: print("getting regisitry data") iatikit.download.data() @@ -97,7 +98,6 @@ def get_registry(refresh=False): def flatten_schema_docs(cur, path=""): - for field, value in cur.items(): info = value.get("info") docs = info.get("docs", "") @@ -180,8 +180,10 @@ def get_codelists_lookup(): mappings.pop("Version") - mappings["Country"] = [('transaction', 'recipientcountry', '@code'), - ('recipientcountry', '@code')] + mappings["Country"] = [ + ("transaction", "recipientcountry", "@code"), + ("recipientcountry", "@code"), + ] codelists_dir = pathlib.Path() / "__iatikitcache__/standard/codelists" @@ -202,7 +204,6 @@ def get_codelists_lookup(): def save_converted_xml_to_csv(dataset_etree, csv_file, prefix=None, filename=None): - transform = etree.XSLT(etree.parse(str(this_dir / "iati-activities.xsl"))) schema = xmlschema.XMLSchema( str( @@ -251,7 +252,6 @@ def csv_file_to_db(csv_fd): def save_part(data): - engine = get_engine() bucket_num, datasets = data with tempfile.TemporaryDirectory() as tmpdirname: @@ -288,7 +288,6 @@ def save_part(data): def save_all(parts=5, sample=None, refresh=False): - create_activites_table() get_standard(refresh) @@ -307,7 +306,6 @@ def save_all(parts=5, sample=None, refresh=False): def process_registry(processes=5, sample=None, refresh=False): - if schema: engine = get_engine() engine.execute( @@ -402,9 +400,10 @@ def traverse_object(obj, emit_object, full_path=tuple(), no_index_path=tuple()): if not narrative: continue if isinstance(narrative, dict): - lang = narrative.get( - "@{http://www.w3.org/XML/1998/namespace}lang", "" - ) or "" + lang = ( + narrative.get("@{http://www.w3.org/XML/1998/namespace}lang", "") + or "" + ) narrative = f"{lang.upper()}: {narrative.get('$', '')}" narratives.append(narrative) obj["narrative"] = ", ".join(narratives) @@ -428,9 +427,16 @@ def traverse_object(obj, emit_object, full_path=tuple(), no_index_path=tuple()): new_object = {key.replace("-", ""): value for key, value in obj.items()} yield new_object, full_path, no_index_path -DATE_MAP = {'1': 'plannedstart', '2': 'actualstart', '3': 'plannedend', '4': 'actualend'} + +DATE_MAP = { + "1": "plannedstart", + "2": "actualstart", + "3": "plannedend", + "4": "actualend", +} DATE_MAP_BY_FIELD = {value: int(key) for key, value in DATE_MAP.items()} + def create_rows(result): rows = [] @@ -438,10 +444,9 @@ def create_rows(result): return [] # get activity dates before traversal remove them - activity_dates = result.activity.get('activity-date', []) or [] + activity_dates = result.activity.get("activity-date", []) or [] for object, full_path, no_index_path in traverse_object(result.activity, 1): - ( object_key, parent_keys_list, @@ -452,17 +457,17 @@ def create_rows(result): object["_link"] = f'{result.id}{"." if object_key else ""}{object_key}' object["_link_activity"] = str(result.id) - if object_type != 'activity': - object["iatiidentifier"] = result.activity.get('iati-identifier') - reporting_org = result.activity.get('reporting-org', {}) or {} - object["reportingorg_ref"] = reporting_org.get('@ref') + if object_type != "activity": + object["iatiidentifier"] = result.activity.get("iati-identifier") + reporting_org = result.activity.get("reporting-org", {}) or {} + object["reportingorg_ref"] = reporting_org.get("@ref") - if object_type == 'activity': + if object_type == "activity": for activity_date in activity_dates: if not isinstance(activity_date, dict): continue - type = activity_date.get('@type') - date = activity_date.get('@iso-date') + type = activity_date.get("@type") + date = activity_date.get("@iso-date") if type and date and type in DATE_MAP: object[DATE_MAP[type]] = date @@ -582,7 +587,6 @@ def schema_analysis(): ) for object_type, key, value_type, count in results: - order, docs = 9999, "" if object_type == "activity": @@ -591,14 +595,20 @@ def schema_analysis(): path = object_type + "_" + key if key.startswith("_link"): order = 0 - if key == '_link': - docs = "Priamry Key for this table. It is unique and used for other tables rows to join to this table." + if key == "_link": + docs = ( + "Primary Key for this table. " + "It is unique and used for other tables rows to join to this table." + ) else: docs = f"Foreign key to {key[6:]} tables `_link` field" - elif key == 'iatiidentifier': - order, docs = 1, 'A globally unique identifier for the activity.' - elif key == 'reportingorg_ref' and object_type != 'activity': - order, docs = 2, 'Machine-readable identification string for the organisation issuing the report.' + elif key == "iatiidentifier": + order, docs = 1, "A globally unique identifier for the activity." + elif key == "reportingorg_ref" and object_type != "activity": + order, docs = ( + 2, + "Machine-readable identification string for the organisation issuing the report.", + ) elif key in DATE_MAP_BY_FIELD: order, docs = DATE_MAP_BY_FIELD[key] + 2, key else: @@ -619,7 +629,10 @@ def schema_analysis(): create_table( "_tables", - 'SELECT table_name, min(field_order) table_order, max("count") as rows FROM _fields WHERE field_order > 10 GROUP BY table_name', + """ + SELECT table_name, min(field_order) table_order, max("count") as rows + FROM _fields WHERE field_order > 10 GROUP BY table_name + """, ) @@ -679,12 +692,13 @@ def postgres_tables(drop_release_objects=False): def augment_transaction(): - with get_engine().begin() as connection: connection.execute( """ drop table if exists _exchange_rates; - create table _exchange_rates(date text,rate float,Currency text, frequency text, source text, country_code text, country text); + create table _exchange_rates( + date text,rate float,Currency text, frequency text, source text, country_code text, country text + ); """ ) @@ -694,14 +708,16 @@ def augment_transaction(): ) f = StringIO(r.text) dbapi_conn = connection.connection - copy_sql = f"COPY _exchange_rates FROM STDIN WITH (FORMAT CSV, HEADER)" + copy_sql = "COPY _exchange_rates FROM STDIN WITH (FORMAT CSV, HEADER)" cur = dbapi_conn.cursor() cur.copy_expert(copy_sql, f) connection.execute( """ drop table if exists _monthly_currency; - create table _monthly_currency as select distinct on (substring(date, 1,7), currency) substring(date, 1,7) yearmonth, rate, currency from _exchange_rates; + create table _monthly_currency as + select distinct on (substring(date, 1,7), currency) substring(date, 1,7) yearmonth, rate, currency + from _exchange_rates; """ ) @@ -709,30 +725,39 @@ def augment_transaction(): "tmp_transaction_usd", connection, """ - select - t._link, case when coalesce(value_currency, activity.defaultcurrency) = 'USD' then value else value / rate end value_usd - from - transaction t + select + t._link, + case + when coalesce(value_currency, activity.defaultcurrency) = 'USD' then value + else value / rate + end value_usd + from + transaction t join - activity using (_link_activity) - left join - _monthly_currency mc on greatest(substring(value_valuedate::text, 1,7), to_char(current_date-60, 'yyyy-mm')) = yearmonth and lower(coalesce(value_currency, activity.defaultcurrency)) = lower(currency) + activity using (_link_activity) + left join _monthly_currency mc + on greatest(substring(value_valuedate::text, 1,7), to_char(current_date-60, 'yyyy-mm')) = yearmonth + and lower(coalesce(value_currency, activity.defaultcurrency)) = lower(currency) """, ) _create_table( "tmp_transaction_sector", connection, - """select distinct on (_link_transaction) _link_transaction, code, codename from transaction_sector where vocabulary is null or vocabulary in ('', '1');""", + """ + select distinct on (_link_transaction) _link_transaction, code, codename + from transaction_sector + where vocabulary is null or vocabulary in ('', '1'); + """, ) _create_table( "tmp_transaction", connection, - """SELECT - t.*, value_usd, ts.code as sector_code, ts.codename as sector_codename - FROM - transaction t + """SELECT + t.*, value_usd, ts.code as sector_code, ts.codename as sector_codename + FROM + transaction t LEFT JOIN tmp_transaction_sector ts on t._link = ts._link_transaction LEFT JOIN @@ -742,20 +767,26 @@ def augment_transaction(): result = connection.execute( """ - select + select sum(case when value_usd is not null then 1 else 0 end) value_usd, sum(case when sector_code is not null then 1 else 0 end) sector_code, sum(case when sector_codename is not null then 1 else 0 end) sector_codename - from + from tmp_transaction """ ).fetchone() connection.execute( """ - insert into _fields values('transaction', 'value_usd', 'number', %s, 'Value in USD', 10000); - insert into _fields values('transaction', 'sector_code', 'string', %s, 'Sector code for default vocabulary', 10001); - insert into _fields values('transaction', 'sector_codename', 'string', %s, 'Sector code name for default vocabulary', 10002); + insert into _fields values( + 'transaction', 'value_usd', 'number', %s, 'Value in USD', 10000 + ); + insert into _fields values( + 'transaction', 'sector_code', 'string', %s, 'Sector code for default vocabulary', 10001 + ); + insert into _fields values( + 'transaction', 'sector_codename', 'string', %s, 'Sector code name for default vocabulary', 10002 + ); """, *result, ) @@ -777,58 +808,112 @@ def transaction_breakdown(): drop table if exists transaction_breakdown; create table transaction_breakdown AS - with sector_count AS - (select _link_activity, code, codename, coalesce(percentage, 100) as percentage, count(*) over activity AS cou, sum(coalesce(percentage, 100)) over activity AS total_percentage FROM sector where coalesce(vocabulary, '1') = '1' and coalesce(percentage, 100) <> 0 window activity as (partition by _link_activity)), + with sector_count AS ( + select + _link_activity, + code, + codename, + coalesce(percentage, 100) as percentage, + count(*) over activity AS cou, + sum(coalesce(percentage, 100)) over activity AS total_percentage + FROM sector + where coalesce(vocabulary, '1') = '1' + and coalesce(percentage, 100) <> 0 window activity as (partition by _link_activity)), country_100 AS ( SELECT _link_activity from recipientcountry group by 1 having sum(coalesce(percentage, 100)) >= 100 - ), + ), - country_region AS ( + country_region AS ( select *, sum(percentage) over (partition by _link_activity) AS total_percentage from - (select prefix, _link_activity, 'country' as locationtype, code as country_code, codename as country_codename, '' as region_code , '' as region_codename, coalesce(percentage, 100) as percentage FROM recipientcountry where coalesce(percentage, 100) <> 0 + ( + select + prefix, + _link_activity, + 'country' as locationtype, + code as country_code, + codename as country_codename, + '' as region_code , + '' as region_codename, + coalesce(percentage, 100) as percentage + FROM recipientcountry where coalesce(percentage, 100) <> 0 + union all - select rr.prefix, _link_activity, 'region' as locationtype, '' , '', code as regioncode, codename , coalesce(percentage, 100) as percentage - FROM recipientregion rr - LEFT JOIN country_100 c1 using (_link_activity) - WHERE coalesce(vocabulary, '1') = '1' and coalesce(percentage, 100) <> 0 and c1._link_activity is null + + select + rr.prefix, + _link_activity, + 'region' as locationtype, + '', + '', + code as regioncode, + codename, + coalesce(percentage, 100) as percentage + FROM recipientregion rr + LEFT JOIN country_100 c1 using (_link_activity) + WHERE coalesce(vocabulary, '1') = '1' + and coalesce(percentage, 100) <> 0 + and c1._link_activity is null ) a ) - select + select t.prefix, - t._link_activity, - t._link as _link_transaction, - t.iatiidentifier, - t.reportingorg_ref, + t._link_activity, + t._link as _link_transaction, + t.iatiidentifier, + t.reportingorg_ref, t.transactiontype_code, t.transactiontype_codename, t.transactiondate_isodate, - coalesce(t.sector_code, sc.code) sector_code, + coalesce(t.sector_code, sc.code) sector_code, coalesce(t.sector_codename, sc.codename) sector_codename, coalesce(t.recipientcountry_code, cr.country_code) recipientcountry_code, - coalesce(t.recipientcountry_codename, cr.country_codename) recipientcountry_codename, + coalesce(t.recipientcountry_codename, cr.country_codename) recipientcountry_codename, coalesce(t.recipientregion_code, cr.region_code) recipientregion_code, - coalesce(t.recipientregion_codename, cr.region_codename) recipientregion_codename, - value * coalesce(sc.percentage/sc.total_percentage, 1) * coalesce(cr.percentage/cr.total_percentage, 1) AS value, + coalesce(t.recipientregion_codename, cr.region_codename) recipientregion_codename, + ( + value * + coalesce(sc.percentage/sc.total_percentage, 1) * + coalesce(cr.percentage/cr.total_percentage, 1) + ) AS value, t.value_currency, t.value_valuedate, - value_usd * coalesce(sc.percentage/sc.total_percentage, 1) * coalesce(cr.percentage/cr.total_percentage, 1) AS value_usd, - coalesce(sc.percentage/sc.total_percentage, 1) * coalesce(cr.percentage/cr.total_percentage, 1) AS percentage_used - from - transaction t - left join + ( + value_usd * + coalesce(sc.percentage/sc.total_percentage, 1) * + coalesce(cr.percentage/cr.total_percentage, 1) + ) AS value_usd, + ( + coalesce(sc.percentage/sc.total_percentage, 1) * + coalesce(cr.percentage/cr.total_percentage, 1) + ) AS percentage_used + from + transaction t + left join sector_count sc on t._link_activity = sc._link_activity and t.sector_code is null - left join - country_region cr on t._link_activity = cr._link_activity and coalesce(t.recipientregion_code, t.recipientcountry_code) is null and cr.total_percentage<>0; - - insert into _tables select 'transaction_breakdown', (select max(case when table_order = 9999 then 0 else table_order end) from _tables) count, (select count(*) from transaction_breakdown); + left join country_region cr + on t._link_activity = cr._link_activity + and coalesce(t.recipientregion_code, t.recipientcountry_code) is null + and cr.total_percentage<>0; + + insert into _tables + select + 'transaction_breakdown', + ( + select max(case when table_order = 9999 then 0 else table_order end) + from _tables + ) count, + ( + select count(*) + from transaction_breakdown + ); """ ) result = connection.execute( """ - select + select sum(case when _link_activity is not null then 1 else 0 end) _link_activity, sum(case when _link_transaction is not null then 1 else 0 end) _link_transaction, sum(case when iatiidentifier is not null then 1 else 0 end) iatiidentifier, @@ -847,32 +932,73 @@ def transaction_breakdown(): sum(case when value_valuedate is not null then 1 else 0 end) value_valuedate, sum(case when value_usd is not null then 1 else 0 end) value_usd, sum(case when percentage_used is not null then 1 else 0 end) percentage_used - from + from transaction_breakdown """ ) connection.execute( """ - insert into _fields values ('transaction_breakdown','_link_activity','string','%s','_link field', 1); - insert into _fields values ('transaction_breakdown','_link_transaction','string','%s','_link field', 2); - insert into _fields values ('transaction_breakdown','iatiidentifier','string','%s','A globally unique identifier for the activity.', 3); - insert into _fields values ('transaction_breakdown','reportingorg_ref','string','%s','Machine-readable identification string for the organisation issuing the report.', 4); - insert into _fields values ('transaction_breakdown','transactiontype_code','string','%s','Transaction Type Code', 5); - insert into _fields values ('transaction_breakdown','transactiontype_codename','string','%s','Transaction Type Codelist Name', 6); - insert into _fields values ('transaction_breakdown','transactiondate_isodate','string','%s','Transaction date', 7); - insert into _fields values ('transaction_breakdown','sector_code','string','%s','Sector code', 8); - insert into _fields values ('transaction_breakdown','sector_codename','string','%s','Sector code codelist name', 9); - insert into _fields values ('transaction_breakdown','recipientcountry_code','string','%s','Recipient Country Code', 10); - insert into _fields values ('transaction_breakdown','recipientcountry_codename','string','%s','Recipient Country Code', 11); - insert into _fields values ('transaction_breakdown','recipientregion_code','string','%s','Recipient Region Code', 12); - insert into _fields values ('transaction_breakdown','recipientregion_codename','string','%s','Recipient Region Codelist Name', 13); - insert into _fields values ('transaction_breakdown','value','number','%s','Value', 14); - insert into _fields values ('transaction_breakdown','value_currency','string','%s','Transaction Currency', 15); - insert into _fields values ('transaction_breakdown','value_valuedate','datetime','%s','Transaction Date', 16); - insert into _fields values ('transaction_breakdown','value_usd','number','%s','Value in USD', 17); - insert into _fields values ('transaction_breakdown','percentage_used','number','%s','Percentage of transaction this row represents', 18); - """, + insert into _fields values ('transaction_breakdown','_link_activity','string','%s','_link field', 1); + insert into _fields values ('transaction_breakdown','_link_transaction','string','%s','_link field', 2); + insert into _fields values ( + 'transaction_breakdown', + 'iatiidentifier', + 'string', + '%s', + 'A globally unique identifier for the activity.', + 3 + ); + insert into _fields values ( + 'transaction_breakdown', + 'reportingorg_ref', + 'string', + '%s', + 'Machine-readable identification string for the organisation issuing the report.', + 4 + ); + insert into _fields values ( + 'transaction_breakdown','transactiontype_code','string','%s','Transaction Type Code', 5 + ); + insert into _fields values ( + 'transaction_breakdown','transactiontype_codename','string','%s','Transaction Type Codelist Name', 6 + ); + insert into _fields values ( + 'transaction_breakdown','transactiondate_isodate','string','%s','Transaction date', 7 + ); + insert into _fields values ('transaction_breakdown','sector_code','string','%s','Sector code', 8); + insert into _fields values ( + 'transaction_breakdown','sector_codename','string','%s','Sector code codelist name', 9 + ); + insert into _fields values ( + 'transaction_breakdown','recipientcountry_code','string','%s','Recipient Country Code', 10 + ); + insert into _fields values ( + 'transaction_breakdown','recipientcountry_codename','string','%s','Recipient Country Code', 11 + ); + insert into _fields values ( + 'transaction_breakdown','recipientregion_code','string','%s','Recipient Region Code', 12 + ); + insert into _fields values ( + 'transaction_breakdown','recipientregion_codename','string','%s','Recipient Region Codelist Name', 13 + ); + insert into _fields values ('transaction_breakdown','value','number','%s','Value', 14); + insert into _fields values ( + 'transaction_breakdown','value_currency','string','%s','Transaction Currency', 15 + ); + insert into _fields values ( + 'transaction_breakdown','value_valuedate','datetime','%s','Transaction Date', 16 + ); + insert into _fields values ('transaction_breakdown','value_usd','number','%s','Value in USD', 17); + insert into _fields values ( + 'transaction_breakdown', + 'percentage_used', + 'number', + '%s', + 'Percentage of transaction this row represents', + 18 + ); + """, *result, ) @@ -883,7 +1009,6 @@ def sql_process(): def export_stats(): - stats_file = output_path / "stats.json" stats = {"updated": str(datetime.datetime.utcnow())} @@ -891,7 +1016,7 @@ def export_stats(): results = connection.execute( "SELECT to_json(_tables) as table FROM _tables order by table_order" ) - stats['tables'] = [row.table for row in results] + stats["tables"] = [row.table for row in results] fields = defaultdict(list) @@ -902,15 +1027,20 @@ def export_stats(): for result in results: fields[result.table_name].append(result.field_info) - stats['fields'] = fields + stats["fields"] = fields stats_file.write_text(json.dumps(stats, indent=2)) - activities = [row.iatiidentifier for row in connection.execute( - "SELECT iatiidentifier from activity group by 1" - )] + activities = [ + row.iatiidentifier + for row in connection.execute( + "SELECT iatiidentifier from activity group by 1" + ) + ] - with gzip.open(str(output_path / 'activities.json.gz'), "wt") as activities_file: + with gzip.open( + str(output_path / "activities.json.gz"), "wt" + ) as activities_file: json.dump(activities, activities_file) @@ -935,7 +1065,6 @@ def export_sqlite(): indexes = [] for object_type, object_details in object_details.items(): - target_object_type = re.sub("[^0-9a-zA-Z]+", "_", object_type.lower()) if object_type == "transaction": target_object_type = "trans" @@ -944,15 +1073,19 @@ def export_sqlite(): for num, item in enumerate(object_details): name = item["name"] - if name.startswith('_link'): - indexes.append(f'CREATE INDEX "{target_object_type}_{name}" on "{target_object_type}"("{name}");') - if name.startswith('_link_'): + if name.startswith("_link"): + indexes.append( + f'CREATE INDEX "{target_object_type}_{name}" on "{target_object_type}"("{name}");' + ) + if name.startswith("_link_"): foreign_table = name[6:] if foreign_table == "transaction": foreign_table = "trans" - if object_type == 'activity': + if object_type == "activity": continue - fks.append(f',FOREIGN KEY("{name}") REFERENCES "{foreign_table}"(_link)') + fks.append( + f',FOREIGN KEY("{name}") REFERENCES "{foreign_table}"(_link)' + ) print(f"importing table {object_type}") with open(f"{tmpdirname}/{object_type}.csv", "wb") as out: @@ -963,8 +1096,6 @@ def export_sqlite(): _, field_def = create_field_sql(object_details, sqlite=True) - - import_sql = f""" .mode csv CREATE TABLE "{target_object_type}" (prefix, {field_def} {' '.join(fks)}) ; @@ -981,11 +1112,9 @@ def export_sqlite(): os.remove(f"{tmpdirname}/{object_type}.csv") - with open(f"{tmpdirname}/fields.csv", "w") as csv_file: - dbapi_conn = connection.connection - copy_sql = f'COPY "_fields" TO STDOUT WITH (FORMAT CSV, FORCE_QUOTE *)' + copy_sql = 'COPY "_fields" TO STDOUT WITH (FORMAT CSV, FORCE_QUOTE *)' cur = dbapi_conn.cursor() cur.copy_expert(copy_sql, csv_file) @@ -1005,24 +1134,23 @@ def export_sqlite(): subprocess.run( ["sqlite3", f"{datasette_file}"], - input='\n'.join(indexes), + input="\n".join(indexes), text=True, check=True, ) subprocess.run(["gzip", "-f", "-9", f"{datasette_file}"], check=True) subprocess.run(["gzip", "-fk", "-9", f"{sqlite_file}"], check=True) - subprocess.run(["zip", f"{output_path}/iati.sqlite.zip", f"{sqlite_file}"], check=True) - + subprocess.run( + ["zip", f"{output_path}/iati.sqlite.zip", f"{sqlite_file}"], check=True + ) def export_csv(): - with get_engine().begin() as connection, zipfile.ZipFile(f"{output_dir}/iati_csv.zip", "w", compression=zipfile.ZIP_DEFLATED) as zip_file: - result = list( - connection.execute( - "SELECT table_name FROM _tables" - ) - ) + with get_engine().begin() as connection, zipfile.ZipFile( + f"{output_dir}/iati_csv.zip", "w", compression=zipfile.ZIP_DEFLATED + ) as zip_file: + result = list(connection.execute("SELECT table_name FROM _tables")) for row in result: csv_output_path = output_path / f"{row.table_name}.csv" with open(f"{csv_output_path}", "wb") as out: @@ -1069,7 +1197,6 @@ def create_avro_schema(object_type, object_details): def generate_avro_records(result, object_details): - cast_to_string = set( [field["name"] for field in object_details if field["type"] == "string"] ) @@ -1084,10 +1211,7 @@ def generate_avro_records(result, object_details): def export_bigquery(): - - json_acct_info = json.loads( - base64.b64decode(os.environ["GOOGLE_SERVICE_ACCOUNT"]) - ) + json_acct_info = json.loads(base64.b64decode(os.environ["GOOGLE_SERVICE_ACCOUNT"])) credentials = service_account.Credentials.from_service_account_info(json_acct_info) @@ -1117,7 +1241,9 @@ def export_bigquery(): ) for row in result: - object_details[row.table_name].append(dict(name=row.field, type=row.type, description=row.docs)) + object_details[row.table_name].append( + dict(name=row.field, type=row.type, description=row.docs) + ) for object_type, object_details in object_details.items(): print(f"loading {object_type}") @@ -1157,19 +1283,17 @@ def export_pgdump(): "-f", f"{output_dir}/iati.custom.pg_dump", "-n", - schema or 'public', + schema or "public", "-F", "c", - os.environ["DATABASE_URL"] + os.environ["DATABASE_URL"], ], - check=True + check=True, ) cmd = f""" pg_dump --no-owner -n {schema or 'public'} {os.environ["DATABASE_URL"]} | gzip > {output_dir}/iati.dump.gz """ - subprocess.run( - cmd, shell=True, check=True - ) + subprocess.run(cmd, shell=True, check=True) def export_all(): @@ -1179,20 +1303,32 @@ def export_all(): export_pgdump() try: export_bigquery() - except Exception as e: + except Exception: print("Big query failed, proceeding anyway") def upload_all(): - if s3_destination and s3_destination != '-': - files = ["stats.json", "iati.sqlite.gz", "iati.db.gz", - "iati.sqlite", "iati.sqlite.zip", - "activities.json.gz", "iati_csv.zip", - "iati.custom.pg_dump", "iati.dump.gz"] + if s3_destination and s3_destination != "-": + files = [ + "stats.json", + "iati.sqlite.gz", + "iati.db.gz", + "iati.sqlite", + "iati.sqlite.zip", + "activities.json.gz", + "iati_csv.zip", + "iati.custom.pg_dump", + "iati.dump.gz", + ] for file in files: - subprocess.run(["s3cmd", "put", f"{output_dir}/{file}", s3_destination], check=True) - subprocess.run(["s3cmd", "setacl", f"{s3_destination}{file}", "--acl-public"], check=True) + subprocess.run( + ["s3cmd", "put", f"{output_dir}/{file}", s3_destination], check=True + ) + subprocess.run( + ["s3cmd", "setacl", f"{s3_destination}{file}", "--acl-public"], + check=True, + ) def run_all(sample=None, refresh=True, processes=5): diff --git a/iatidata/sort_iati.py b/iatidata/sort_iati.py index 6158f0d..05c74d2 100644 --- a/iatidata/sort_iati.py +++ b/iatidata/sort_iati.py @@ -28,9 +28,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import sys from collections import OrderedDict + from lxml import etree as ET -import sys # Namespaces necessary for opening schema files namespaces = {"xsd": "http://www.w3.org/2001/XMLSchema"} diff --git a/iatidata/tests/test_iatidata.py b/iatidata/tests/test_iatidata.py index f7e1197..2374016 100644 --- a/iatidata/tests/test_iatidata.py +++ b/iatidata/tests/test_iatidata.py @@ -1,19 +1,45 @@ from collections import OrderedDict + from lxml import etree from iatidata import sort_iati_element + def test_sort_iati_element(): - input_xml = 'XXXXXXXX<narrative xml:lang="en">XXXXXXXX</narrative>' + input_xml = ( + "" + "XXXXXXXX" + '' + '' + '<narrative xml:lang="en">XXXXXXXX</narrative>' + "" + ) element = etree.fromstring(input_xml) - schema_dict = OrderedDict([ - ('iati-identifier', OrderedDict()), - ('title', OrderedDict([('narrative', OrderedDict())])), - ('activity-status', OrderedDict()), - ('transaction', OrderedDict([('transaction-type', OrderedDict()), ('transaction-date', OrderedDict())])), - ]) + schema_dict = OrderedDict( + [ + ("iati-identifier", OrderedDict()), + ("title", OrderedDict([("narrative", OrderedDict())])), + ("activity-status", OrderedDict()), + ( + "transaction", + OrderedDict( + [ + ("transaction-type", OrderedDict()), + ("transaction-date", OrderedDict()), + ] + ), + ), + ] + ) sort_iati_element(element, schema_dict) - expected_xml = b'XXXXXXXX<narrative xml:lang="en">XXXXXXXX</narrative>' + expected_xml = ( + b"" + b"XXXXXXXX" + b'<narrative xml:lang="en">XXXXXXXX</narrative>' + b'' + b'' + b"" + ) assert etree.tostring(element) == expected_xml diff --git a/requirements_dev.in b/requirements_dev.in index 73c8ac6..c15cedc 100644 --- a/requirements_dev.in +++ b/requirements_dev.in @@ -1,3 +1,6 @@ -r requirements.txt pytest +black +isort +flake8 diff --git a/requirements_dev.txt b/requirements_dev.txt index d1b7630..d13169d 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -4,6 +4,8 @@ # # pip-compile requirements_dev.in # +black==23.12.1 + # via -r requirements_dev.in boto3==1.34.19 # via -r requirements.txt botocore==1.34.19 @@ -24,7 +26,9 @@ charset-normalizer==3.3.2 # -r requirements.txt # requests click==8.1.7 - # via -r requirements.txt + # via + # -r requirements.txt + # black configparser==6.0.0 # via # -r requirements.txt @@ -37,6 +41,8 @@ exceptiongroup==1.2.0 # via pytest fastavro==1.9.3 # via -r requirements.txt +flake8==7.0.0 + # via -r requirements_dev.in google-api-core==2.15.0 # via # -r requirements.txt @@ -75,6 +81,8 @@ idna==3.6 # requests iniconfig==2.0.0 # via pytest +isort==5.13.2 + # via -r requirements_dev.in jmespath==1.0.1 # via # -r requirements.txt @@ -84,11 +92,20 @@ lxml==5.1.0 # via # -r requirements.txt # iatikit +mccabe==0.7.0 + # via flake8 +mypy-extensions==1.0.0 + # via black packaging==23.2 # via # -r requirements.txt + # black # google-cloud-bigquery # pytest +pathspec==0.12.1 + # via black +platformdirs==4.1.0 + # via black pluggy==1.4.0 # via pytest protobuf==4.25.2 @@ -107,6 +124,10 @@ pyasn1-modules==0.3.0 # via # -r requirements.txt # google-auth +pycodestyle==2.11.1 + # via flake8 +pyflakes==3.2.0 + # via flake8 pytest==7.4.4 # via -r requirements_dev.in python-dateutil==2.8.2 @@ -135,7 +156,11 @@ six==1.16.0 sqlalchemy==1.4.51 # via -r requirements.txt tomli==2.0.1 - # via pytest + # via + # black + # pytest +typing-extensions==4.9.0 + # via black urllib3==2.0.7 # via # -r requirements.txt diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8a7ae23 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[isort] +profile=black + +[flake8] +max-line-length=119