Skip to content

Commit

Permalink
Add test cases for AWS identity verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Moody authored and tompollard committed Jul 9, 2024
1 parent 716f8b8 commit 6e01891
Showing 1 changed file with 191 additions and 2 deletions.
193 changes: 191 additions & 2 deletions physionet-django/user/test_views.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import contextlib
import datetime
import json
import logging
import os
import pdb
import re
import shutil
import time
from unittest import mock

import boto3
from django.conf import settings

from lightwave.views import DBCAL_FILE, ORIGINAL_DBCAL_FILE
Expand All @@ -16,12 +19,22 @@
from django.core import mail
from django.core.management import call_command
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import RequestFactory, TestCase
from django.test import RequestFactory, TestCase, override_settings
from django.urls import reverse
from django.utils import timezone
import requests_mock

from user.enums import TrainingStatus
from user.models import AssociatedEmail, Profile, User, Training, TrainingType, Question, TrainingQuestion
from user.models import (
AssociatedEmail,
CloudInformation,
Profile,
Question,
Training,
TrainingQuestion,
TrainingType,
User,
)
from user.views import (activate_user, edit_emails, edit_profile,
edit_password_complete, public_profile, register, user_settings,
verify_email)
Expand Down Expand Up @@ -666,3 +679,179 @@ def test_reject_training_invalid(self, mock_get_info_from_certificate_pdf):

self.assertEqual(response.status_code, 200)
self.assertEqual(self.training.status, TrainingStatus.REVIEW)


@override_settings(
AWS_VERIFICATION_BUCKET_NAME='example-bucket',
AWS_VERIFICATION_BUCKET_REGION='us-east-1',
)
class TestAWSVerification(TestCase):
"""
Test AWS user verification
"""

USER_EMAIL = '[email protected]'
AWS_ACCOUNT = '314159265359'
AWS_USERID = 'AIDAAAAAAAAAAAAAAAAAA'
AWS_USERNAME = 'tim'
AWS_ARN = 'arn:aws:iam::314159265359:user/tim'
IDENTITY_JSON = json.dumps({
'UserId': AWS_USERID,
'Account': AWS_ACCOUNT,
'Arn': AWS_ARN,
})

S3_RESPONSE_403 = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error><Code>AccessDenied</Code>'
'<Message>Access Denied</Message></Error>'
)
S3_RESPONSE_404 = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error><Code>NoSuchKey</Code>'
'<Message>The specified key does not exist.</Message></Error>'
)

def get_cloud_information(self):
user = User.objects.get(email=self.USER_EMAIL)
cloud_info, _ = CloudInformation.objects.get_or_create(user=user)
return cloud_info

def mock_s3_presign(self, response_text):
match = re.search(r'aws s3 presign s3://([\w\-]+)/(\S+) '
r'--region ([\w\-]+)',
response_text)
self.assertIsNotNone(match)

with mock.patch.dict(os.environ, {
'AWS_SHARED_CREDENTIALS_FILE': os.path.join(
settings.DEMO_FILE_ROOT, 'aws_credentials'),
'AWS_PROFILE': 'default',
'AWS_ACCESS_KEY_ID': '',
'AWS_SECRET_ACCESS_KEY': '',
'AWS_SECURITY_TOKEN': '',
'AWS_SESSION_TOKEN': '',
'AWS_DEFAULT_REGION': '',
}):
client = boto3.client('s3', region_name=match.group(3))
return client.generate_presigned_url('get_object', Params={
'Bucket': match.group(1),
'Key': match.group(2),
})

@requests_mock.Mocker()
def test_verify_success(self, mocker):
"""
Test successfully adding a valid AWS identity
"""
self.client.login(username=self.USER_EMAIL, password='Tester11!')

cloud_info = self.get_cloud_information()
cloud_info.aws_id = None
cloud_info.aws_userid = None
cloud_info.aws_user_arn = None
cloud_info.aws_verification_datetime = None
cloud_info.save()

response = self.client.post(
reverse('edit_cloud'),
data={'save-aws': '', 'aws_identity': self.IDENTITY_JSON},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], reverse('edit_cloud_aws'))

response = self.client.get(response['location'])
self.assertEqual(response.status_code, 200)
signed_url = self.mock_s3_presign(response.content.decode())

# Assuming signature is correct, this URL should give a 404.
# With signature missing, it should give 403.
mocker.get(
signed_url, complete_qs=True,
status_code=404, text=self.S3_RESPONSE_404,
)
mocker.get(
signed_url.split('?')[0], complete_qs=True,
status_code=403, text=self.S3_RESPONSE_403,
)
response = self.client.post(
reverse('edit_cloud_aws'),
data={'signed_url': signed_url},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], reverse('edit_cloud'))

cloud_info = self.get_cloud_information()
self.assertEqual(cloud_info.aws_id, self.AWS_ACCOUNT)
self.assertEqual(cloud_info.aws_userid, self.AWS_USERID)
self.assertEqual(cloud_info.aws_user_arn, self.AWS_ARN)
self.assertIsNotNone(cloud_info.aws_verification_datetime)

@requests_mock.Mocker()
def test_verify_failure(self, mocker):
"""
Test failure to add an invalid AWS identity
"""
self.client.login(username=self.USER_EMAIL, password='Tester11!')

cloud_info = self.get_cloud_information()
cloud_info.aws_id = None
cloud_info.aws_userid = None
cloud_info.aws_user_arn = None
cloud_info.aws_verification_datetime = None
cloud_info.save()

response = self.client.post(
reverse('edit_cloud'),
data={'save-aws': '', 'aws_identity': self.IDENTITY_JSON},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], reverse('edit_cloud_aws'))

response = self.client.get(response['location'])
self.assertEqual(response.status_code, 200)
signed_url = self.mock_s3_presign(response.content.decode())

# Assuming signature is wrong, this URL should give a 403.

mocker.get(
signed_url, complete_qs=True,
status_code=403, text=self.S3_RESPONSE_403,
)
response = self.client.post(
reverse('edit_cloud_aws'),
data={'signed_url': signed_url},
)
self.assertEqual(response.status_code, 200)

cloud_info = self.get_cloud_information()
self.assertIsNone(cloud_info.aws_id)
self.assertIsNone(cloud_info.aws_userid)
self.assertIsNone(cloud_info.aws_user_arn)
self.assertIsNone(cloud_info.aws_verification_datetime)

@requests_mock.Mocker()
def test_delete_info(self, mocker):
"""
Test deleting an existing AWS identity
"""
self.client.login(username=self.USER_EMAIL, password='Tester11!')

cloud_info = self.get_cloud_information()
cloud_info.aws_id = self.AWS_ACCOUNT
cloud_info.aws_userid = self.AWS_USERID
cloud_info.aws_user_arn = self.AWS_ARN
cloud_info.aws_verification_datetime = timezone.now()
cloud_info.save()

response = self.client.post(
reverse('edit_cloud'),
data={'delete-aws': ''},
)
self.assertEqual(response.status_code, 200)

cloud_info = self.get_cloud_information()
self.assertIsNone(cloud_info.aws_id)
self.assertIsNone(cloud_info.aws_userid)
self.assertIsNone(cloud_info.aws_user_arn)
self.assertIsNone(cloud_info.aws_verification_datetime)

0 comments on commit 6e01891

Please sign in to comment.