Skip to content

Commit

Permalink
Refactor the auth mechanism selection algorithm
Browse files Browse the repository at this point in the history
This fixes some regressions for wrongly identified auth mechanisms when
parsing the security_schemes from the openapi document.
Also the whole selection decision tree is mapped onto a better function
call dependency chain.

(cherry picked from commit 6c6a5c2)
  • Loading branch information
mdellweg authored and patchback[bot] committed Aug 29, 2024
1 parent 5817710 commit 8c4a439
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 59 deletions.
3 changes: 3 additions & 0 deletions CHANGES/pulp-glue/+fix_auth_selection_regressions.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed regressions in the auth selection algorithm of `AuthProviderBase`.
In particular, proposals requiring multiple mechanisms are ignored for now instead of considering each constituent individually,
"HTTP Bearer" and other IANA schemes are no longer interpreted as "HTTP Basic" and the empty proposal rightfully reflects no needed authentication.
112 changes: 58 additions & 54 deletions pulp-glue/pulp_glue/common/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import typing as t
from collections import defaultdict
from contextlib import suppress
from io import BufferedReader
from urllib.parse import urljoin
Expand All @@ -24,7 +25,6 @@
SAFE_METHODS = ["GET", "HEAD", "OPTIONS"]
ISO_DATE_FORMAT = "%Y-%m-%d"
ISO_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
AUTH_PRIORITY = ("oauth2", "basic")


class OpenAPIError(Exception):
Expand All @@ -45,30 +45,6 @@ class UnsafeCallError(OpenAPIError):
pass


class OpenAPISecurityScheme:
def __init__(self, security_scheme: t.Dict[str, t.Any]):
self.security_scheme = security_scheme

self.security_type = self.security_scheme["type"]
self.description = self.security_scheme.get("description", "")

if self.security_type == "oauth2":
try:
self.flows: t.Dict[str, t.Any] = self.security_scheme["flows"]
client_credentials: t.Optional[t.Dict[str, t.Any]] = self.flows.get(
"clientCredentials"
)
if client_credentials:
self.flow_type: str = "clientCredentials"
self.token_url: str = client_credentials["tokenUrl"]
self.scopes: t.List[str] = list(client_credentials["scopes"].keys())
except KeyError:
raise OpenAPIValidationError

if self.security_type == "http":
self.scheme = self.security_scheme["scheme"]


class AuthProviderBase:
"""
Base class for auth providers.
Expand All @@ -78,61 +54,89 @@ class AuthProviderBase:
Returned auth objects need to be compatible with `requests.auth.AuthBase`.
"""

def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
"""Implement this to provide means of http basic auth."""
return None

def http_auth(
self, security_scheme: t.Dict[str, t.Any], scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
"""Select a suitable http auth scheme or return None."""
# https://www.iana.org/assignments/http-authschemes/http-authschemes.xhtml
if security_scheme["scheme"] == "basic":
result = self.basic_auth(scopes)
if result:
return result
return None

def oauth2_client_credentials_auth(
self, flow: t.Any
) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
self, flow: t.Any, scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
"""Implement this to provide other authentication methods."""
return None

def oauth2_auth(
self, security_scheme: t.Dict[str, t.Any], scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
"""Select a suitable oauth2 flow or return None."""
# Check flows by preference.
if "clientCredentials" in security_scheme["flows"]:
flow = security_scheme["flows"]["clientCredentials"]
# Select this flow only if it claims to provide all the necessary scopes.
# This will allow subsequent auth proposals to be considered.
if set(scopes) - set(flow["scopes"]):
return None

result = self.oauth2_client_credentials_auth(flow, scopes)
if result:
return result
return None

def __call__(
self,
security: t.List[t.Dict[str, t.List[str]]],
security_schemes: t.Dict[str, t.Dict[str, t.Any]],
) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:

authorized_schemes = []
authorized_schemes_types = set()
) -> t.Optional[requests.auth.AuthBase]:

# Reorder the proposals by their type to prioritize properly.
# Select only single mechanism proposals on the way.
proposed_schemes: t.Dict[str, t.Dict[str, t.List[str]]] = defaultdict(dict)
for proposal in security:
for name in proposal:
authorized_schemes.append(security_schemes[name])
authorized_schemes_types.add(security_schemes[name]["type"])

# Check for oauth2 scheme first
if "oauth2" in authorized_schemes_types:
for flow in authorized_schemes:
if flow["type"] == "oauth2":
oauth2_flow = OpenAPISecurityScheme(flow)

# We "know" we'll have an outh2-flow here
if oauth2_flow.flow_type == "client_credentials":
result = self.oauth2_client_credentials_auth(oauth2_flow)
if len(proposal) == 0:
# Empty proposal: No authentication needed. Shortcut return.
return None
if len(proposal) == 1:
name, scopes = list(proposal.items())[0]
proposed_schemes[security_schemes[name]["type"]][name] = scopes
# Ignore all proposals with more than one required auth mechanism.

# Check for auth schemes by preference.
if "oauth2" in proposed_schemes:
for name, scopes in proposed_schemes["oauth2"].items():
result = self.oauth2_auth(security_schemes[name], scopes)
if result:
return result

# if we get here, either no-oauth2, OR we couldn't find creds
if "http" in authorized_schemes_types:
result = self.basic_auth()
if result:
return result
if "http" in proposed_schemes:
for name, scopes in proposed_schemes["http"].items():
result = self.http_auth(security_schemes[name], scopes)
if result:
return result

raise OpenAPIError(_("No suitable auth scheme found."))


class BasicAuthProvider(AuthProviderBase):
"""
Reference Implementation for AuthProviderBase providing basic auth with `username`, `password`.
Implementation for AuthProviderBase providing basic auth with fixed `username`, `password`.
"""

def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.auth = requests.auth.HTTPBasicAuth(username, password)

def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
return (self.username, self.password)
def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
return self.auth


class OpenAPI:
Expand Down
93 changes: 93 additions & 0 deletions pulp-glue/tests/test_auth_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pytest

from pulp_glue.common.openapi import AuthProviderBase, OpenAPIError

pytestmark = pytest.mark.glue


SECURITY_SCHEMES = {
"A": {"type": "http", "scheme": "bearer"},
"B": {"type": "http", "scheme": "basic"},
"C": {
"type": "oauth2",
"flows": {
"implicit": {
"authorizationUrl": "https://example.com/api/oauth/dialog",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
"authorizationCode": {
"authorizationUrl": "https://example.com/api/oauth/dialog",
"tokenUrl": "https://example.com/api/oauth/token",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
},
},
"D": {
"type": "oauth2",
"flows": {
"implicit": {
"authorizationUrl": "https://example.com/api/oauth/dialog",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
"clientCredentials": {
"tokenUrl": "https://example.com/api/oauth/token",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
},
},
}


def test_auth_provider_select_mechanism(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(AuthProviderBase, "basic_auth", lambda *args: "BASIC")
monkeypatch.setattr(
AuthProviderBase,
"oauth2_client_credentials_auth",
lambda *args: "OAUTH2_CLIENT_CREDENTIALS",
)

# Error if no auth scheme is available.
with pytest.raises(OpenAPIError):
AuthProviderBase()([], SECURITY_SCHEMES)

# Error if a nonexisting mechanism is proposed.
with pytest.raises(KeyError):
AuthProviderBase()([{"foo": []}], SECURITY_SCHEMES)

# Succeed without mechanism for an empty proposal.
assert AuthProviderBase()([{}], SECURITY_SCHEMES) is None

# Try select a not implemented auth.
with pytest.raises(OpenAPIError):
AuthProviderBase()([{"A": []}], SECURITY_SCHEMES)

# Ignore proposals with multiple mechanisms.
with pytest.raises(OpenAPIError):
AuthProviderBase()([{"B": [], "C": []}], SECURITY_SCHEMES)

# Select Basic auth alone and from multiple.
assert AuthProviderBase()([{"B": []}], SECURITY_SCHEMES) == "BASIC"
assert AuthProviderBase()([{"A": []}, {"B": []}], SECURITY_SCHEMES) == "BASIC"

# Select oauth2 client credentials alone and over basic auth if scopes match.
assert AuthProviderBase()([{"D": []}], SECURITY_SCHEMES) == "OAUTH2_CLIENT_CREDENTIALS"
assert (
AuthProviderBase()([{"B": []}, {"D": []}], SECURITY_SCHEMES) == "OAUTH2_CLIENT_CREDENTIALS"
)
assert (
AuthProviderBase()([{"B": []}, {"D": ["read:pets"]}], SECURITY_SCHEMES)
== "OAUTH2_CLIENT_CREDENTIALS"
)
assert AuthProviderBase()([{"B": []}, {"D": ["read:cattle"]}], SECURITY_SCHEMES) == "BASIC"
11 changes: 6 additions & 5 deletions pulpcore/cli/common/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class PulpCLIAuthProvider(AuthProviderBase):
def __init__(self, pulp_ctx: PulpCLIContext):
self.pulp_ctx = pulp_ctx

def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
if self.pulp_ctx.username is None:
self.pulp_ctx.username = click.prompt("Username")
if self.pulp_ctx.password is None:
Expand All @@ -230,10 +230,10 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth
return SecretStorageBasicAuth(self.pulp_ctx)
else:
self.pulp_ctx.password = click.prompt("Password", hide_input=True)
return (self.pulp_ctx.username, self.pulp_ctx.password)
return requests.auth.HTTPBasicAuth(self.pulp_ctx.username, self.pulp_ctx.password)

def oauth2_client_credentials_auth(
self, oauth2_flow: t.Any
self, flow: t.Any, scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
if self.pulp_ctx.username is None:
self.pulp_ctx.username = click.prompt("Username/ClientID")
Expand All @@ -243,8 +243,9 @@ def oauth2_client_credentials_auth(
return OAuth2ClientCredentialsAuth(
client_id=self.pulp_ctx.username,
client_secret=self.pulp_ctx.password,
token_url=oauth2_flow.token_url,
scopes=oauth2_flow.scopes,
token_url=flow["tokenUrl"],
# Try to request all possible scopes.
scopes=flow["scopes"],
)


Expand Down

0 comments on commit 8c4a439

Please sign in to comment.