diff --git a/ansible_base/oauth2_provider/migrations/0005_hash_existing_tokens.py b/ansible_base/oauth2_provider/migrations/0005_hash_existing_tokens.py new file mode 100644 index 000000000..13849bd4f --- /dev/null +++ b/ansible_base/oauth2_provider/migrations/0005_hash_existing_tokens.py @@ -0,0 +1,13 @@ +from django.db import migrations + +from ansible_base.oauth2_provider.migrations._utils import hash_tokens + + +class Migration(migrations.Migration): + dependencies = [ + ("dab_oauth2_provider", "0004_alter_oauth2accesstoken_scope"), + ] + + operations = [ + migrations.RunPython(hash_tokens), + ] diff --git a/ansible_base/oauth2_provider/migrations/_utils.py b/ansible_base/oauth2_provider/migrations/_utils.py new file mode 100644 index 000000000..1f52a8871 --- /dev/null +++ b/ansible_base/oauth2_provider/migrations/_utils.py @@ -0,0 +1,16 @@ +import hashlib + +from ansible_base.lib.utils.hashing import hash_string + + +def hash_tokens(apps, schema_editor): + OAuth2AccessToken = apps.get_model("dab_oauth2_provider", "OAuth2AccessToken") + OAuth2RefreshToken = apps.get_model("dab_oauth2_provider", "OAuth2RefreshToken") + for model in (OAuth2AccessToken, OAuth2RefreshToken): + for token in model.objects.all(): + # Never re-hash a hashed token + if len(token.token) == 64: + continue + hashed = hash_string(token.token, hasher=hashlib.sha256) + token.token = hashed + token.save() diff --git a/test_app/tests/oauth2_provider/migrations/__init__.py b/test_app/tests/oauth2_provider/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test_app/tests/oauth2_provider/migrations/test_utils.py b/test_app/tests/oauth2_provider/migrations/test_utils.py new file mode 100644 index 000000000..d95976b5d --- /dev/null +++ b/test_app/tests/oauth2_provider/migrations/test_utils.py @@ -0,0 +1,44 @@ +from django.apps import apps + +from ansible_base.lib.utils.response import get_relative_url +from ansible_base.oauth2_provider.migrations._utils import hash_tokens + + +def test_oauth2_migrations_hash_tokens(unauthenticated_api_client, oauth2_admin_access_token): + """ + Force an unhashed token, run the migration function, and ensure the token is hashed. + """ + unhashed_token = oauth2_admin_access_token[1] + oauth2_admin_access_token[0].token = unhashed_token + oauth2_admin_access_token[0].save() + + url = get_relative_url("user-me") + response = unauthenticated_api_client.get( + url, + headers={'Authorization': f'Bearer {oauth2_admin_access_token[1]}'}, + ) + # When we set the token back to unhashed, we shouldn't be able to auth with it. + assert response.status_code == 401 + + hash_tokens(apps, None) + + url = get_relative_url("user-me") + response = unauthenticated_api_client.get( + url, + headers={'Authorization': f'Bearer {oauth2_admin_access_token[1]}'}, + ) + # Now it's been hashed, so we can auth + assert response.status_code == 200 + assert response.data['username'] == oauth2_admin_access_token[0].user.username + + # And if we re-run the hash function again for some reason, we never double-hash + hash_tokens(apps, None) + + url = get_relative_url("user-me") + response = unauthenticated_api_client.get( + url, + headers={'Authorization': f'Bearer {oauth2_admin_access_token[1]}'}, + ) + # We can still auth + assert response.status_code == 200 + assert response.data['username'] == oauth2_admin_access_token[0].user.username