From 8c4a43913caf4af47af8733db9335b38e879e373 Mon Sep 17 00:00:00 2001 From: Matthias Dellweg Date: Tue, 27 Aug 2024 11:34:02 +0200 Subject: [PATCH] Refactor the auth mechanism selection algorithm This fixes some regressions for wrongly identified auth mechanisms when parsing the security_schemes from the openapi document. Also the whole selection decision tree is mapped onto a better function call dependency chain. (cherry picked from commit 6c6a5c2243da605459aaf0ddfc97f3a882ad0f7f) --- .../+fix_auth_selection_regressions.bugfix | 3 + pulp-glue/pulp_glue/common/openapi.py | 112 +++++++++--------- pulp-glue/tests/test_auth_provider.py | 93 +++++++++++++++ pulpcore/cli/common/generic.py | 11 +- 4 files changed, 160 insertions(+), 59 deletions(-) create mode 100644 CHANGES/pulp-glue/+fix_auth_selection_regressions.bugfix create mode 100644 pulp-glue/tests/test_auth_provider.py diff --git a/CHANGES/pulp-glue/+fix_auth_selection_regressions.bugfix b/CHANGES/pulp-glue/+fix_auth_selection_regressions.bugfix new file mode 100644 index 000000000..bf8ddd40c --- /dev/null +++ b/CHANGES/pulp-glue/+fix_auth_selection_regressions.bugfix @@ -0,0 +1,3 @@ +Fixed regressions in the auth selection algorithm of `AuthProviderBase`. +In particular, proposals requiring multiple mechanisms are ignored for now instead of considering each constituent individually, +"HTTP Bearer" and other IANA schemes are no longer interpreted as "HTTP Basic" and the empty proposal rightfully reflects no needed authentication. diff --git a/pulp-glue/pulp_glue/common/openapi.py b/pulp-glue/pulp_glue/common/openapi.py index b6ca6ba63..5548df142 100644 --- a/pulp-glue/pulp_glue/common/openapi.py +++ b/pulp-glue/pulp_glue/common/openapi.py @@ -6,6 +6,7 @@ import json import os import typing as t +from collections import defaultdict from contextlib import suppress from io import BufferedReader from urllib.parse import urljoin @@ -24,7 +25,6 @@ SAFE_METHODS = ["GET", "HEAD", "OPTIONS"] ISO_DATE_FORMAT = "%Y-%m-%d" ISO_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" -AUTH_PRIORITY = ("oauth2", "basic") class OpenAPIError(Exception): @@ -45,30 +45,6 @@ class UnsafeCallError(OpenAPIError): pass -class OpenAPISecurityScheme: - def __init__(self, security_scheme: t.Dict[str, t.Any]): - self.security_scheme = security_scheme - - self.security_type = self.security_scheme["type"] - self.description = self.security_scheme.get("description", "") - - if self.security_type == "oauth2": - try: - self.flows: t.Dict[str, t.Any] = self.security_scheme["flows"] - client_credentials: t.Optional[t.Dict[str, t.Any]] = self.flows.get( - "clientCredentials" - ) - if client_credentials: - self.flow_type: str = "clientCredentials" - self.token_url: str = client_credentials["tokenUrl"] - self.scopes: t.List[str] = list(client_credentials["scopes"].keys()) - except KeyError: - raise OpenAPIValidationError - - if self.security_type == "http": - self.scheme = self.security_scheme["scheme"] - - class AuthProviderBase: """ Base class for auth providers. @@ -78,61 +54,89 @@ class AuthProviderBase: Returned auth objects need to be compatible with `requests.auth.AuthBase`. """ - def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: + def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]: """Implement this to provide means of http basic auth.""" return None + def http_auth( + self, security_scheme: t.Dict[str, t.Any], scopes: t.List[str] + ) -> t.Optional[requests.auth.AuthBase]: + """Select a suitable http auth scheme or return None.""" + # https://www.iana.org/assignments/http-authschemes/http-authschemes.xhtml + if security_scheme["scheme"] == "basic": + result = self.basic_auth(scopes) + if result: + return result + return None + def oauth2_client_credentials_auth( - self, flow: t.Any - ) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: + self, flow: t.Any, scopes: t.List[str] + ) -> t.Optional[requests.auth.AuthBase]: """Implement this to provide other authentication methods.""" return None + def oauth2_auth( + self, security_scheme: t.Dict[str, t.Any], scopes: t.List[str] + ) -> t.Optional[requests.auth.AuthBase]: + """Select a suitable oauth2 flow or return None.""" + # Check flows by preference. + if "clientCredentials" in security_scheme["flows"]: + flow = security_scheme["flows"]["clientCredentials"] + # Select this flow only if it claims to provide all the necessary scopes. + # This will allow subsequent auth proposals to be considered. + if set(scopes) - set(flow["scopes"]): + return None + + result = self.oauth2_client_credentials_auth(flow, scopes) + if result: + return result + return None + def __call__( self, security: t.List[t.Dict[str, t.List[str]]], security_schemes: t.Dict[str, t.Dict[str, t.Any]], - ) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: - - authorized_schemes = [] - authorized_schemes_types = set() + ) -> t.Optional[requests.auth.AuthBase]: + # Reorder the proposals by their type to prioritize properly. + # Select only single mechanism proposals on the way. + proposed_schemes: t.Dict[str, t.Dict[str, t.List[str]]] = defaultdict(dict) for proposal in security: - for name in proposal: - authorized_schemes.append(security_schemes[name]) - authorized_schemes_types.add(security_schemes[name]["type"]) - - # Check for oauth2 scheme first - if "oauth2" in authorized_schemes_types: - for flow in authorized_schemes: - if flow["type"] == "oauth2": - oauth2_flow = OpenAPISecurityScheme(flow) - - # We "know" we'll have an outh2-flow here - if oauth2_flow.flow_type == "client_credentials": - result = self.oauth2_client_credentials_auth(oauth2_flow) + if len(proposal) == 0: + # Empty proposal: No authentication needed. Shortcut return. + return None + if len(proposal) == 1: + name, scopes = list(proposal.items())[0] + proposed_schemes[security_schemes[name]["type"]][name] = scopes + # Ignore all proposals with more than one required auth mechanism. + + # Check for auth schemes by preference. + if "oauth2" in proposed_schemes: + for name, scopes in proposed_schemes["oauth2"].items(): + result = self.oauth2_auth(security_schemes[name], scopes) if result: return result # if we get here, either no-oauth2, OR we couldn't find creds - if "http" in authorized_schemes_types: - result = self.basic_auth() - if result: - return result + if "http" in proposed_schemes: + for name, scopes in proposed_schemes["http"].items(): + result = self.http_auth(security_schemes[name], scopes) + if result: + return result + raise OpenAPIError(_("No suitable auth scheme found.")) class BasicAuthProvider(AuthProviderBase): """ - Reference Implementation for AuthProviderBase providing basic auth with `username`, `password`. + Implementation for AuthProviderBase providing basic auth with fixed `username`, `password`. """ def __init__(self, username: str, password: str): - self.username = username - self.password = password + self.auth = requests.auth.HTTPBasicAuth(username, password) - def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: - return (self.username, self.password) + def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]: + return self.auth class OpenAPI: diff --git a/pulp-glue/tests/test_auth_provider.py b/pulp-glue/tests/test_auth_provider.py new file mode 100644 index 000000000..d0db62ecc --- /dev/null +++ b/pulp-glue/tests/test_auth_provider.py @@ -0,0 +1,93 @@ +import pytest + +from pulp_glue.common.openapi import AuthProviderBase, OpenAPIError + +pytestmark = pytest.mark.glue + + +SECURITY_SCHEMES = { + "A": {"type": "http", "scheme": "bearer"}, + "B": {"type": "http", "scheme": "basic"}, + "C": { + "type": "oauth2", + "flows": { + "implicit": { + "authorizationUrl": "https://example.com/api/oauth/dialog", + "scopes": { + "write:pets": "modify pets in your account", + "read:pets": "read your pets", + }, + }, + "authorizationCode": { + "authorizationUrl": "https://example.com/api/oauth/dialog", + "tokenUrl": "https://example.com/api/oauth/token", + "scopes": { + "write:pets": "modify pets in your account", + "read:pets": "read your pets", + }, + }, + }, + }, + "D": { + "type": "oauth2", + "flows": { + "implicit": { + "authorizationUrl": "https://example.com/api/oauth/dialog", + "scopes": { + "write:pets": "modify pets in your account", + "read:pets": "read your pets", + }, + }, + "clientCredentials": { + "tokenUrl": "https://example.com/api/oauth/token", + "scopes": { + "write:pets": "modify pets in your account", + "read:pets": "read your pets", + }, + }, + }, + }, +} + + +def test_auth_provider_select_mechanism(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(AuthProviderBase, "basic_auth", lambda *args: "BASIC") + monkeypatch.setattr( + AuthProviderBase, + "oauth2_client_credentials_auth", + lambda *args: "OAUTH2_CLIENT_CREDENTIALS", + ) + + # Error if no auth scheme is available. + with pytest.raises(OpenAPIError): + AuthProviderBase()([], SECURITY_SCHEMES) + + # Error if a nonexisting mechanism is proposed. + with pytest.raises(KeyError): + AuthProviderBase()([{"foo": []}], SECURITY_SCHEMES) + + # Succeed without mechanism for an empty proposal. + assert AuthProviderBase()([{}], SECURITY_SCHEMES) is None + + # Try select a not implemented auth. + with pytest.raises(OpenAPIError): + AuthProviderBase()([{"A": []}], SECURITY_SCHEMES) + + # Ignore proposals with multiple mechanisms. + with pytest.raises(OpenAPIError): + AuthProviderBase()([{"B": [], "C": []}], SECURITY_SCHEMES) + + # Select Basic auth alone and from multiple. + assert AuthProviderBase()([{"B": []}], SECURITY_SCHEMES) == "BASIC" + assert AuthProviderBase()([{"A": []}, {"B": []}], SECURITY_SCHEMES) == "BASIC" + + # Select oauth2 client credentials alone and over basic auth if scopes match. + assert AuthProviderBase()([{"D": []}], SECURITY_SCHEMES) == "OAUTH2_CLIENT_CREDENTIALS" + assert ( + AuthProviderBase()([{"B": []}, {"D": []}], SECURITY_SCHEMES) == "OAUTH2_CLIENT_CREDENTIALS" + ) + assert ( + AuthProviderBase()([{"B": []}, {"D": ["read:pets"]}], SECURITY_SCHEMES) + == "OAUTH2_CLIENT_CREDENTIALS" + ) + assert AuthProviderBase()([{"B": []}, {"D": ["read:cattle"]}], SECURITY_SCHEMES) == "BASIC" diff --git a/pulpcore/cli/common/generic.py b/pulpcore/cli/common/generic.py index 50bf1d07e..7e043bb37 100644 --- a/pulpcore/cli/common/generic.py +++ b/pulpcore/cli/common/generic.py @@ -219,7 +219,7 @@ class PulpCLIAuthProvider(AuthProviderBase): def __init__(self, pulp_ctx: PulpCLIContext): self.pulp_ctx = pulp_ctx - def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: + def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]: if self.pulp_ctx.username is None: self.pulp_ctx.username = click.prompt("Username") if self.pulp_ctx.password is None: @@ -230,10 +230,10 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth return SecretStorageBasicAuth(self.pulp_ctx) else: self.pulp_ctx.password = click.prompt("Password", hide_input=True) - return (self.pulp_ctx.username, self.pulp_ctx.password) + return requests.auth.HTTPBasicAuth(self.pulp_ctx.username, self.pulp_ctx.password) def oauth2_client_credentials_auth( - self, oauth2_flow: t.Any + self, flow: t.Any, scopes: t.List[str] ) -> t.Optional[requests.auth.AuthBase]: if self.pulp_ctx.username is None: self.pulp_ctx.username = click.prompt("Username/ClientID") @@ -243,8 +243,9 @@ def oauth2_client_credentials_auth( return OAuth2ClientCredentialsAuth( client_id=self.pulp_ctx.username, client_secret=self.pulp_ctx.password, - token_url=oauth2_flow.token_url, - scopes=oauth2_flow.scopes, + token_url=flow["tokenUrl"], + # Try to request all possible scopes. + scopes=flow["scopes"], )