From daf8056519d9c509adbaa43976cef5ac1fddcb35 Mon Sep 17 00:00:00 2001 From: Duncan Dewhurst Date: Thu, 24 Oct 2024 15:25:35 +1300 Subject: [PATCH] manage.py: Run ruff format --- manage.py | 491 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 252 insertions(+), 239 deletions(-) diff --git a/manage.py b/manage.py index b543580d9..9969859b9 100755 --- a/manage.py +++ b/manage.py @@ -26,16 +26,16 @@ from ocdskit.schema import get_schema_fields basedir = Path(__file__).resolve().parent -schemadir = basedir / 'schema' -localedir = basedir / 'docs' / 'locale' +schemadir = basedir / "schema" +localedir = basedir / "docs" / "locale" -sys.path.append(str(basedir / 'docs')) +sys.path.append(str(basedir / "docs")) from conf import release # noqa: E402 def custom_warning_formatter(message, category, filename, lineno, line=None): - return str(message) + '\n' + return str(message) + "\n" warnings.formatwarning = custom_warning_formatter @@ -66,54 +66,49 @@ def custom_warning_formatter(message, category, filename, lineno, line=None): """) common_versioned_definitions = { - 'StringNullUriVersioned': { - 'type': ['string', 'null'], - 'format': 'uri', + "StringNullUriVersioned": { + "type": ["string", "null"], + "format": "uri", }, - 'StringNullDateTimeVersioned': { - 'type': ['string', 'null'], - 'format': 'date-time', + "StringNullDateTimeVersioned": { + "type": ["string", "null"], + "format": "date-time", }, - 'StringNullVersioned': { - 'type': ['string', 'null'], - 'format': None, + "StringNullVersioned": { + "type": ["string", "null"], + "format": None, }, } recognized_types = ( # Array - ['array'], - ['array', 'null'], # optional string arrays - + ["array"], + ["array", "null"], # optional string arrays # Object - ['object'], - ['object', 'null'], # /Organization/details - + ["object"], + ["object", "null"], # /Organization/details # String - ['string'], - ['string', 'null'], - + ["string"], + ["string", "null"], # Literal - ['boolean', 'null'], - ['integer', 'null'], - ['number', 'null'], - + ["boolean", "null"], + ["integer", "null"], + ["number", "null"], # Mixed - ['string', 'integer'], - ['string', 'integer', 'null'], + ["string", "integer"], + ["string", "integer", "null"], ) keywords_to_remove = ( # Metadata keywords # https://tools.ietf.org/html/draft-fge-json-schema-validation-00#section-6 - 'title', - 'description', - 'default', - + "title", + "description", + "default", # Extended keywords # http://os4d.opendataservices.coop/development/schema/#extended-json-schema - 'omitWhenMerged', - 'wholeListMerge', + "omitWhenMerged", + "wholeListMerge", ) @@ -125,12 +120,12 @@ def json_load(filename, library=json, **kwargs): def json_dump(filename, data): """Write JSON data to the given filename.""" - with (schemadir / filename).open('w') as f: + with (schemadir / filename).open("w") as f: json.dump(data, f, indent=2) - f.write('\n') + f.write("\n") -def csv_load(url, delimiter=','): +def csv_load(url, delimiter=","): """Load CSV data into a ``csv.DictReader`` from the given URL.""" return csv.DictReader(StringIO(get(url).text), delimiter=delimiter) @@ -138,8 +133,8 @@ def csv_load(url, delimiter=','): @contextmanager def csv_dump(filename, fieldnames): """Write CSV headers to the given filename, and yield a ``csv.writer``.""" - f = (schemadir / 'codelists' / filename).open('w') - writer = csv.writer(f, lineterminator='\n') + f = (schemadir / "codelists" / filename).open("w") + writer = csv.writer(f, lineterminator="\n") writer.writerow(fieldnames) try: yield writer @@ -164,8 +159,9 @@ def coerce_to_list(data, key): def get_metaschema(): """Patches and returns the JSON Schema Draft 4 metaschema.""" - return json_merge_patch.merge(json_load('metaschema/json-schema-draft-4.json'), - json_load('metaschema/meta-schema-patch.json')) + return json_merge_patch.merge( + json_load("metaschema/json-schema-draft-4.json"), json_load("metaschema/meta-schema-patch.json") + ) def get_common_definition_ref(item): @@ -180,18 +176,18 @@ def get_common_definition_ref(item): # And adds no keywords to the definition. if any(keyword not in {*keywords, *keywords_to_remove} for keyword in item): continue - return {'$ref': f'#/definitions/{name}'} + return {"$ref": f"#/definitions/{name}"} return None -def add_versioned(schema, unversioned_pointers, pointer=''): +def add_versioned(schema, unversioned_pointers, pointer=""): """Call ``_add_versioned`` on each field.""" - for key, value in schema['properties'].items(): - new_pointer = f'{pointer}/properties/{key}' + for key, value in schema["properties"].items(): + new_pointer = f"{pointer}/properties/{key}" _add_versioned(schema, unversioned_pointers, new_pointer, key, value) - for key, value in schema.get('definitions', {}).items(): - new_pointer = f'{pointer}/definitions/{key}' + for key, value in schema.get("definitions", {}).items(): + new_pointer = f"{pointer}/definitions/{key}" add_versioned(value, unversioned_pointers, pointer=new_pointer) @@ -209,102 +205,102 @@ def _add_versioned(schema, unversioned_pointers, pointer, key, value): if pointer in unversioned_pointers: return - types = coerce_to_list(value, 'type') + types = coerce_to_list(value, "type") # If a type is unrecognized, we might need to update this script. if ( - '$ref' not in value + "$ref" not in value and types not in recognized_types - and not (pointer == '/definitions/Quantity/properties/value' and types == ['string', 'number', 'null']) + and not (pointer == "/definitions/Quantity/properties/value" and types == ["string", "number", "null"]) ): - warnings.warn(f'{pointer} has unrecognized type {types}') + warnings.warn(f"{pointer} has unrecognized type {types}") # For example, if $ref is used. if not types: # Ignore the `amendment` field, which had no `id` field in OCDS 1.0. - if 'deprecated' not in value: + if "deprecated" not in value: versioned_pointer = f"{value['$ref'][1:]}/properties/id" # If the `id` field is on an object not in an array, it needs to be versioned (e.g. buyer/properties/id). if versioned_pointer in unversioned_pointers: - value['$ref'] = value['$ref'] + 'VersionedId' + value["$ref"] = value["$ref"] + "VersionedId" return # Reference a common versioned definition if possible, to limit the size of the schema. ref = get_common_definition_ref(value) if ref: - schema['properties'][key] = ref + schema["properties"][key] = ref # Iterate into objects with properties like `Item.unit`. Otherwise, version objects with no properties as a # whole, like `Organization.details`. - elif types == ['object'] and 'properties' in value: + elif types == ["object"] and "properties" in value: add_versioned(value, unversioned_pointers, pointer=pointer) else: new_value = deepcopy(value) - if types == ['array']: - item_types = coerce_to_list(value['items'], 'type') + if types == ["array"]: + item_types = coerce_to_list(value["items"], "type") # See https://standard.open-contracting.org/latest/en/schema/merging/#whole-list-merge - if value.get('wholeListMerge'): + if value.get("wholeListMerge"): # Update `$ref` to the unversioned definition. - if '$ref' in value['items']: - new_value['items']['$ref'] = value['items']['$ref'] + 'Unversioned' + if "$ref" in value["items"]: + new_value["items"]["$ref"] = value["items"]["$ref"] + "Unversioned" # Otherwise, similarly, don't iterate over item properties. # See https://standard.open-contracting.org/latest/en/schema/merging/#lists - elif '$ref' in value['items']: + elif "$ref" in value["items"]: # Leave `$ref` to the versioned definition. return # Exceptional case for deprecated `Amendment.changes`. - elif item_types == ['object'] and pointer == '/definitions/Amendment/properties/changes': + elif item_types == ["object"] and pointer == "/definitions/Amendment/properties/changes": return # Warn in case new combinations are added to the release schema. - elif item_types != ['string']: + elif item_types != ["string"]: # Note: Versioning the properties of un-$ref'erenced objects in arrays isn't implemented. However, # this combination hasn't occurred, with the exception of `Amendment/changes`. warnings.warn(f"{pointer}/items has unexpected type {item_types}") versioned = deepcopy(versioned_template) - versioned['items']['properties']['value'] = new_value - schema['properties'][key] = versioned + versioned["items"]["properties"]["value"] = new_value + schema["properties"][key] = versioned def update_refs_to_unversioned_definitions(schema): """Replace ``$ref`` values with unversioned definitions.""" for key, value in schema.items(): - if key == '$ref': - schema[key] = value + 'Unversioned' + if key == "$ref": + schema[key] = value + "Unversioned" elif isinstance(value, dict): update_refs_to_unversioned_definitions(value) -def get_unversioned_pointers(schema, fields, pointer=''): +def get_unversioned_pointers(schema, fields, pointer=""): """Return the JSON Pointers to ``id`` fields that must not be versioned if the object is in an array.""" if isinstance(schema, list): for index, item in enumerate(schema): - get_unversioned_pointers(item, fields, pointer=f'{pointer}/{index}') + get_unversioned_pointers(item, fields, pointer=f"{pointer}/{index}") elif isinstance(schema, dict): # Follows the logic of _get_merge_rules in merge.py from ocds-merge. - types = coerce_to_list(schema, 'type') + types = coerce_to_list(schema, "type") # If an array is whole list merge, its items are unversioned. - if 'array' in types and schema.get('wholeListMerge'): + if "array" in types and schema.get("wholeListMerge"): return - if 'array' in types and 'items' in schema: - item_types = coerce_to_list(schema['items'], 'type') + if "array" in types and "items" in schema: + item_types = coerce_to_list(schema["items"], "type") # If an array mixes objects and non-objects, it is whole list merge. - if any(item_type != 'object' for item_type in item_types): + if any(item_type != "object" for item_type in item_types): return # If it is an array of objects, any `id` fields are unversioned. - if 'id' in schema['items']['properties']: - if hasattr(schema['items'], '__reference__'): - reference = schema['items'].__reference__['$ref'][1:] + if "id" in schema["items"]["properties"]: + if hasattr(schema["items"], "__reference__"): + reference = schema["items"].__reference__["$ref"][1:] else: reference = pointer - fields.add(f'{reference}/properties/id') + fields.add(f"{reference}/properties/id") for key, value in schema.items(): - get_unversioned_pointers(value, fields, pointer=f'{pointer}/{key}') + get_unversioned_pointers(value, fields, pointer=f"{pointer}/{key}") def remove_omit_when_merged(schema): @@ -314,12 +310,12 @@ def remove_omit_when_merged(schema): remove_omit_when_merged(item) elif isinstance(schema, dict): for key, value in schema.items(): - if key == 'properties': + if key == "properties": for prop in list(value): - if value[prop].get('omitWhenMerged'): + if value[prop].get("omitWhenMerged"): del value[prop] - if prop in schema['required']: - schema['required'].remove(prop) + if prop in schema["required"]: + schema["required"].remove(prop) remove_omit_when_merged(value) @@ -330,7 +326,7 @@ def remove_metadata_and_extended_keywords(schema): remove_metadata_and_extended_keywords(item) elif isinstance(schema, dict): for key, value in schema.items(): - if key in {'definitions', 'properties'}: + if key in {"definitions", "properties"}: for subschema in value.values(): for keyword in keywords_to_remove: subschema.pop(keyword, None) @@ -340,15 +336,17 @@ def remove_metadata_and_extended_keywords(schema): def get_versioned_release_schema(schema): """Return the versioned release schema.""" # Update schema metadata. - release_with_underscores = release.replace('.', '__') - schema['id'] = f'https://standard.open-contracting.org/schema/{release_with_underscores}/versioned-release-validation-schema.json' - schema['title'] = 'Schema for a compiled, versioned Open Contracting Release.' + release_with_underscores = release.replace(".", "__") + schema["id"] = ( + f"https://standard.open-contracting.org/schema/{release_with_underscores}/versioned-release-validation-schema.json" + ) + schema["title"] = "Schema for a compiled, versioned Open Contracting Release." # Release IDs, dates and tags appear alongside values in the versioned release schema. remove_omit_when_merged(schema) # Create unversioned copies of all definitions. - unversioned_definitions = {k + 'Unversioned': deepcopy(v) for k, v in schema['definitions'].items()} + unversioned_definitions = {k + "Unversioned": deepcopy(v) for k, v in schema["definitions"].items()} update_refs_to_unversioned_definitions(unversioned_definitions) # Determine which `id` fields occur on objects in arrays. @@ -356,17 +354,17 @@ def get_versioned_release_schema(schema): get_unversioned_pointers(jsonref.replace_refs(schema), unversioned_pointers) # Omit `ocid` from versioning. - ocid = schema['properties'].pop('ocid') + ocid = schema["properties"].pop("ocid") add_versioned(schema, unversioned_pointers) - schema['properties']['ocid'] = ocid + schema["properties"]["ocid"] = ocid # Add the common versioned definitions. for name, keywords in common_versioned_definitions.items(): versioned = deepcopy(versioned_template) for keyword, value in keywords.items(): if value: - versioned['items']['properties']['value'][keyword] = value - schema['definitions'][name] = versioned + versioned["items"]["properties"]["value"][keyword] = value + schema["definitions"][name] = versioned # Add missing definitions. while True: @@ -376,17 +374,17 @@ def get_versioned_release_schema(schema): except jsonref.JsonRefError as e: name = e.cause.args[0] - if name.endswith('VersionedId'): + if name.endswith("VersionedId"): # Add a copy of an definition with a versioned `id` field, using the same logic as before. - definition = deepcopy(schema['definitions'][name[:-11]]) - pointer = f'/definitions/{name[:-11]}/properties/id' + definition = deepcopy(schema["definitions"][name[:-11]]) + pointer = f"/definitions/{name[:-11]}/properties/id" pointers = unversioned_pointers - {pointer} - _add_versioned(definition, pointers, pointer, 'id', definition['properties']['id']) + _add_versioned(definition, pointers, pointer, "id", definition["properties"]["id"]) else: # Add a copy of an definition with no versioned fields. definition = unversioned_definitions[name] - schema['definitions'][name] = definition + schema["definitions"][name] = definition # Remove all metadata and extended keywords. remove_metadata_and_extended_keywords(schema) @@ -400,7 +398,7 @@ def cli(): @cli.command() -@click.argument('filename') +@click.argument("filename") def unused_terms(filename): """ Print terms in FILENAME that don't occur in the documentation. @@ -408,75 +406,80 @@ def unused_terms(filename): Can be used to remove unused terms from a glossary. """ paths = [] - for extension in ('csv', 'json', 'md'): - paths.extend(glob(str(basedir / 'docs' / '**' / f'*.{extension}'), recursive=True)) + for extension in ("csv", "json", "md"): + paths.extend(glob(str(basedir / "docs" / "**" / f"*.{extension}"), recursive=True)) corpus = [] for path in paths: with open(path) as f: # Replace punctuation with whitespace, except in abbreviations like "e.g.". - corpus.append(re.sub(r'(? 1]), - *sorted([f'- `{paths[0]}`' for _, paths in counts.items() if len(paths) == 1]), - '% ENDLIST', + "% STARTLIST", + *sorted([f"- `{name}`, in any location" for name, paths in counts.items() if len(paths) > 1]), + *sorted([f"- `{paths[0]}`" for _, paths in counts.items() if len(paths) == 1]), + "% ENDLIST", ] - path = basedir / 'docs' / 'guidance' / 'map' / 'translations.md' + path = basedir / "docs" / "guidance" / "map" / "translations.md" with path.open() as f: contents = f.read() - with path.open('w') as f: - f.write(re.sub(r'% STARTLIST.+% ENDLIST', '\n'.join(bulletlist), contents, flags=re.DOTALL)) + with path.open("w") as f: + f.write(re.sub(r"% STARTLIST.+% ENDLIST", "\n".join(bulletlist), contents, flags=re.DOTALL)) - json_dump('meta-schema.json', get_metaschema()) - json_dump('dereferenced-release-schema.json', jsonref_release_schema) - json_dump('versioned-release-validation-schema.json', get_versioned_release_schema(release_schema)) + json_dump("meta-schema.json", get_metaschema()) + json_dump("dereferenced-release-schema.json", jsonref_release_schema) + json_dump("versioned-release-validation-schema.json", get_versioned_release_schema(release_schema)) @cli.command() -@click.argument('file', type=click.File()) +@click.argument("file", type=click.File()) def update_country(file): """ Update country.csv from ISO 3166-1 using FILE. @@ -579,22 +589,22 @@ def update_country(file): codes = { # https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2#User-assigned_code_elements - 'XK': 'Kosovo', + "XK": "Kosovo", } - rpc = json.load(file)[0]['rpc'][0] + rpc = json.load(file)[0]["rpc"][0] offset = int(rpc[0]) for entry in rpc[3][1]: - d = entry['d'] + d = entry["d"] # Clean "Western Sahara*", "United Arab Emirates (the)", etc. - codes[d[str(offset + 9)]] = re.sub(r' \(the\)|\*', '', d[str(offset + 13)]) + codes[d[str(offset + 9)]] = re.sub(r" \(the\)|\*", "", d[str(offset + 13)]) # The country code appears at offsets 9 and 15. Check that they are always the same. if d[str(offset + 9)] != d[str(offset + 15)]: raise AssertionError - with open(schemadir / 'codelists' / 'country.csv', 'w') as f: - writer = csv.writer(f, lineterminator='\n') - writer.writerow(['Code', 'Title']) + with open(schemadir / "codelists" / "country.csv", "w") as f: + writer = csv.writer(f, lineterminator="\n") + writer.writerow(["Code", "Title"]) for code in sorted(codes): writer.writerow([code, codes[code]]) @@ -607,47 +617,47 @@ def update_currency(): # List One: Current Currency & Funds current_codes = {} - url = 'https://www.six-group.com/dam/download/financial-information/data-center/iso-currrency/amendments/lists/list_one.xml' + url = "https://www.six-group.com/dam/download/financial-information/data-center/iso-currrency/amendments/lists/list_one.xml" tree = etree.fromstring(get(url).content) # noqa: S320 # trusted external - for node in tree.xpath('//CcyNtry'): + for node in tree.xpath("//CcyNtry"): # Entries like Antarctica have no universal currency. - if node.xpath('./Ccy'): - code = node.xpath('./Ccy')[0].text - title = node.xpath('./CcyNm')[0].text.strip() + if node.xpath("./Ccy"): + code = node.xpath("./Ccy")[0].text + title = node.xpath("./CcyNm")[0].text.strip() if code not in current_codes: current_codes[code] = title # We should expect currency titles to be consistent across countries. elif current_codes[code] != title: - raise click.ClickException(f'expected {current_codes[code]}, got {title}') + raise click.ClickException(f"expected {current_codes[code]}, got {title}") # List Three: Historic Denominations (Currencies & Funds) historic_codes = {} - url = 'https://www.six-group.com/dam/download/financial-information/data-center/iso-currrency/amendments/lists/list_three.xml' + url = "https://www.six-group.com/dam/download/financial-information/data-center/iso-currrency/amendments/lists/list_three.xml" tree = etree.fromstring(get(url).content) # noqa: S320 # trusted external - for node in tree.xpath('//HstrcCcyNtry'): - code = node.xpath('./Ccy')[0].text - title = node.xpath('./CcyNm')[0].text.strip() - valid_until = node.xpath('./WthdrwlDt')[0].text + for node in tree.xpath("//HstrcCcyNtry"): + code = node.xpath("./Ccy")[0].text + title = node.xpath("./CcyNm")[0].text.strip() + valid_until = node.xpath("./WthdrwlDt")[0].text # Use ISO8601 interval notation. - valid_until = re.sub(r'^(\d{4})-(\d{4})$', r'\1/\2', valid_until.replace(' to ', '/')) + valid_until = re.sub(r"^(\d{4})-(\d{4})$", r"\1/\2", valid_until.replace(" to ", "/")) if ( code not in current_codes # Last condition: If the code is historical, use the most recent title and valid date. - and (code not in historic_codes or valid_until > historic_codes[code]['Valid Until']) + and (code not in historic_codes or valid_until > historic_codes[code]["Valid Until"]) ): - historic_codes[code] = {'Title': title, 'Valid Until': valid_until} + historic_codes[code] = {"Title": title, "Valid Until": valid_until} - with csv_dump('currency.csv', ['Code', 'Title', 'Valid Until']) as writer: + with csv_dump("currency.csv", ["Code", "Title", "Valid Until"]) as writer: for code in sorted(current_codes): writer.writerow([code, current_codes[code], None]) for code in sorted(historic_codes): - writer.writerow([code, historic_codes[code]['Title'], historic_codes[code]['Valid Until']]) + writer.writerow([code, historic_codes[code]["Title"], historic_codes[code]["Valid Until"]]) - release_schema = json_load('release-schema.json') + release_schema = json_load("release-schema.json") codes = sorted([*current_codes, historic_codes]) - release_schema['definitions']['Value']['properties']['currency']['enum'] = [*codes, None] + release_schema["definitions"]["Value"]["properties"]["currency"]["enum"] = [*codes, None] - json_dump('release-schema.json', release_schema) + json_dump("release-schema.json", release_schema) @cli.command() @@ -656,14 +666,14 @@ def update_language(): # https://www.iso.org/iso-639-language-codes.html # https://id.loc.gov/vocabulary/iso639-1.html - with csv_dump('language.csv', ['Code', 'Title']) as writer: - for row in csv_load('https://id.loc.gov/vocabulary/iso639-1.tsv', delimiter='\t'): + with csv_dump("language.csv", ["Code", "Title"]) as writer: + for row in csv_load("https://id.loc.gov/vocabulary/iso639-1.tsv", delimiter="\t"): # Remove parentheses, like "Greek, Modern (1453-)", and split alternatives. - titles = re.split(r' *\| *', re.sub(r' \(.+\)', '', row['Label (English)'])) + titles = re.split(r" *\| *", re.sub(r" \(.+\)", "", row["Label (English)"])) # Remove duplication like "Ndebele, North | North Ndebele" and join alternatives using a comma instead of # a pipe. To preserve order, a dict without values is used instead of a set. - titles = ', '.join({' '.join(reversed(title.split(', '))): None for title in titles}) - writer.writerow([row['code'], titles]) + titles = ", ".join({" ".join(reversed(title.split(", "))): None for title in titles}) + writer.writerow([row["code"], titles]) @cli.command() @@ -677,37 +687,37 @@ def update_media_type(): # See "Registries included below". registries = [ - 'application', - 'audio', - 'font', - 'image', - 'message', - 'model', - 'multipart', - 'text', - 'video', + "application", + "audio", + "font", + "image", + "message", + "model", + "multipart", + "text", + "video", ] - with csv_dump('mediaType.csv', ['Code', 'Title']) as writer: + with csv_dump("mediaType.csv", ["Code", "Title"]) as writer: for registry in registries: # See "Available Formats" under each heading. - for row in csv_load(f'https://www.iana.org/assignments/media-types/{registry}.csv'): - if ' ' in row['Name']: - name, message = row['Name'].split(' ', 1) + for row in csv_load(f"https://www.iana.org/assignments/media-types/{registry}.csv"): + if " " in row["Name"]: + name, message = row["Name"].split(" ", 1) else: - name, message = row['Name'], None + name, message = row["Name"], None code = f"{registry}/{name}" - template = row['Template'] + template = row["Template"] # All messages are expected to be about deprecation and obsoletion. if message: - logging.warning('%s: %s', message, code) + logging.warning("%s: %s", message, code) # "x-emf" has "image/emf" in its "Template" value (but it is deprecated). elif template and template != code: raise click.ClickException(f"expected {code}, got {template}") else: writer.writerow([code, name]) - writer.writerow(['offline/print', 'print']) + writer.writerow(["offline/print", "print"]) @cli.command() @@ -723,7 +733,7 @@ def update(ctx): @click.pass_context def check_iso_6523(ctx): def text(node, xpath): - return re.sub(r'\s+', ' ', node.xpath(xpath)[0]) + return re.sub(r"\s+", " ", node.xpath(xpath)[0]) """ Checks PEPPOL BIS Billing 3.0's ISO 6523 ICD codelist for new codes. @@ -735,52 +745,54 @@ def text(node, xpath): maximum = 213 skipped = {92, 103, 181, 182} - response = get('https://docs.peppol.eu/poacc/billing/3.0/codelist/ICD/') + response = get("https://docs.peppol.eu/poacc/billing/3.0/codelist/ICD/") - divs = lxml.html.fromstring(response.content).xpath('//dd/div[@id]') + divs = lxml.html.fromstring(response.content).xpath("//dd/div[@id]") if not divs: - raise click.ClickException('The HTML markup of the data source has changed. Please update the script.') + raise click.ClickException("The HTML markup of the data source has changed. Please update the script.") rows = [] for div in divs: - identifier = div.attrib['id'] + identifier = div.attrib["id"] number = int(identifier) if number < minimum or number > maximum or number in skipped: - name = text(div, './strong/text()') - notes = text(div, './p/text()') - issuer = '' + name = text(div, "./strong/text()") + notes = text(div, "./p/text()") + issuer = "" # "Issuing agency: " appears at the end of the paragraph. The rest of the paragraph contains either a # purpose ("Intended Purpose/App. Area") or notes ("Notes on Use of Code"), with or without the label. - notes = re.sub(r'(Notes on Use of Code|Intended Purpose/App. Area)[: ]+', '', notes) - if 'Issuing agency: ' in notes: - notes, issuer = notes.split('Issuing agency: ') + notes = re.sub(r"(Notes on Use of Code|Intended Purpose/App. Area)[: ]+", "", notes) + if "Issuing agency: " in notes: + notes, issuer = notes.split("Issuing agency: ") rows.append([identifier, name, issuer, notes]) if rows: - csv.writer(sys.stdout, delimiter='\t').writerows(rows) + csv.writer(sys.stdout, delimiter="\t").writerows(rows) else: - click.echo('No new codes found.') + click.echo("No new codes found.") def add_translation_note(path, language, domain): """Add a translation note to a file.""" - base_url = 'https://standard.open-contracting.org/1.1' + base_url = "https://standard.open-contracting.org/1.1" with open(path) as f: document = lxml.html.fromstring(f.read()) - translator = gettext.translation('theme', localedir, languages=[language]) + translator = gettext.translation("theme", localedir, languages=[language]) _ = translator.gettext - pattern = f'{base_url}/{{}}/{domain}/' + pattern = f"{base_url}/{{}}/{domain}/" response = requests.get(pattern.format(language), timeout=10) # If it's a new page, add the note to the current version of the page. if response.status_code == requests.codes.not_found: - message = _('This page was recently added to the English documentation. ' - 'It has not yet been translated.') + message = _( + 'This page was recently added to the English documentation. ' + "It has not yet been translated." + ) # If it's an existing page, add the note the last version of the page. else: @@ -788,10 +800,10 @@ def add_translation_note(path, language, domain): xpath = '//div[@itemprop="articleBody"]' replacement = lxml.html.fromstring(response.content).xpath(xpath)[0] - replacement.make_links_absolute(f'{base_url}/{language}') + replacement.make_links_absolute(f"{base_url}/{language}") # Remove any existing translation notes. - parent = replacement.xpath('//h1')[0].getparent() + parent = replacement.xpath("//h1")[0].getparent() for div in replacement.xpath('//h1/following-sibling::div[@class="admonition note"]'): parent.remove(div) @@ -800,19 +812,20 @@ def add_translation_note(path, language, domain): message = _( 'This page was recently changed in the English documentation. ' - 'The changes have not yet been translated.' + "The changes have not yet been translated." ) template = ( '

%(note)s

' - '%(message)s

' + "%(message)s

" ) - document.xpath('//h1')[0].addnext(lxml.etree.XML(template % { - 'note': _('Note'), 'message': message % {'url': pattern.format('en')}})) + document.xpath("//h1")[0].addnext( + lxml.etree.XML(template % {"note": _("Note"), "message": message % {"url": pattern.format("en")}}) + ) - with open(path, 'wb') as f: - f.write(lxml.html.tostring(document, encoding='utf-8')) + with open(path, "wb") as f: + f.write(lxml.html.tostring(document, encoding="utf-8")) @cli.command() @@ -826,11 +839,11 @@ def add_translation_notes(): https://standard.open-contracting.org/latest/en/governance/translation/ """ - excluded = ('.doctrees', '_downloads', '_images', '_sources', '_static', 'codelists', 'genindex', 'search') + excluded = (".doctrees", "_downloads", "_images", "_sources", "_static", "codelists", "genindex", "search") - for language in ('es', 'fr'): - build_dir = basedir / 'build' / language - language_dir = localedir / language / 'LC_MESSAGES' + for language in ("es", "fr"): + build_dir = basedir / "build" / language + language_dir = localedir / language / "LC_MESSAGES" for root, dirs, files in os.walk(build_dir): # Skip Sphinx directories. @@ -847,9 +860,9 @@ def add_translation_notes(): source = os.path.join(root, os.path.dirname(name)) domain = relative_path(build_dir, source) - path = language_dir / domain / 'index.po' + path = language_dir / domain / "index.po" if not path.is_file(): - path = language_dir / f'{domain}.po' + path = language_dir / f"{domain}.po" if not path.is_file(): add_translation_note(os.path.join(root, name), language, domain) continue @@ -862,5 +875,5 @@ def add_translation_notes(): break -if __name__ == '__main__': +if __name__ == "__main__": cli()