-
Notifications
You must be signed in to change notification settings - Fork 48
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Data migration for old, unhashed tokens
Signed-off-by: Rick Elrod <[email protected]>
- Loading branch information
Showing
4 changed files
with
73 additions
and
0 deletions.
There are no files selected for viewing
13 changes: 13 additions & 0 deletions
13
ansible_base/oauth2_provider/migrations/0005_hash_existing_tokens.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from django.db import migrations | ||
|
||
from ansible_base.oauth2_provider.migrations._utils import hash_tokens | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("dab_oauth2_provider", "0004_alter_oauth2accesstoken_scope"), | ||
] | ||
|
||
operations = [ | ||
migrations.RunPython(hash_tokens), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import hashlib | ||
|
||
from ansible_base.lib.utils.hashing import hash_string | ||
|
||
|
||
def hash_tokens(apps, schema_editor): | ||
OAuth2AccessToken = apps.get_model("dab_oauth2_provider", "OAuth2AccessToken") | ||
OAuth2RefreshToken = apps.get_model("dab_oauth2_provider", "OAuth2RefreshToken") | ||
for model in (OAuth2AccessToken, OAuth2RefreshToken): | ||
for token in model.objects.all(): | ||
# Never re-hash a hashed token | ||
if len(token.token) == 64: | ||
continue | ||
hashed = hash_string(token.token, hasher=hashlib.sha256) | ||
token.token = hashed | ||
token.save() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from django.apps import apps | ||
|
||
from ansible_base.lib.utils.response import get_relative_url | ||
from ansible_base.oauth2_provider.migrations._utils import hash_tokens | ||
|
||
|
||
def test_oauth2_migrations_hash_tokens(unauthenticated_api_client, oauth2_admin_access_token): | ||
""" | ||
Force an unhashed token, run the migration function, and ensure the token is hashed. | ||
""" | ||
unhashed_token = oauth2_admin_access_token[1] | ||
oauth2_admin_access_token[0].token = unhashed_token | ||
oauth2_admin_access_token[0].save() | ||
|
||
url = get_relative_url("user-me") | ||
response = unauthenticated_api_client.get( | ||
url, | ||
headers={'Authorization': f'Bearer {oauth2_admin_access_token[1]}'}, | ||
) | ||
# When we set the token back to unhashed, we shouldn't be able to auth with it. | ||
assert response.status_code == 401 | ||
|
||
hash_tokens(apps, None) | ||
|
||
url = get_relative_url("user-me") | ||
response = unauthenticated_api_client.get( | ||
url, | ||
headers={'Authorization': f'Bearer {oauth2_admin_access_token[1]}'}, | ||
) | ||
# Now it's been hashed, so we can auth | ||
assert response.status_code == 200 | ||
assert response.data['username'] == oauth2_admin_access_token[0].user.username | ||
|
||
# And if we re-run the hash function again for some reason, we never double-hash | ||
hash_tokens(apps, None) | ||
|
||
url = get_relative_url("user-me") | ||
response = unauthenticated_api_client.get( | ||
url, | ||
headers={'Authorization': f'Bearer {oauth2_admin_access_token[1]}'}, | ||
) | ||
# We can still auth | ||
assert response.status_code == 200 | ||
assert response.data['username'] == oauth2_admin_access_token[0].user.username |