From 1081ea7b897625d24c8ceda209bcf809c6b7269c Mon Sep 17 00:00:00 2001 From: Yusuf Musleh Date: Mon, 23 Oct 2023 02:07:25 +0300 Subject: [PATCH] feat: CRUD API for Taxonomy Tags [FC-0036] (#96) * Implements add, update, delete taxonomy tag api/rest + tests * resyncs ObjectTags when tag added or updated. * Updates Taxonomy Tags permissions to use Taxonomy permissions * Bumps version to 0.2.6 --- openedx_learning/__init__.py | 2 +- openedx_tagging/core/tagging/api.py | 55 ++ openedx_tagging/core/tagging/models/base.py | 99 ++++ .../core/tagging/rest_api/v1/permissions.py | 26 +- .../core/tagging/rest_api/v1/serializers.py | 37 +- .../core/tagging/rest_api/v1/views.py | 195 ++++++- openedx_tagging/core/tagging/rules.py | 28 +- .../core/tagging/test_rules.py | 4 +- .../core/tagging/test_views.py | 542 +++++++++++++++++- 9 files changed, 950 insertions(+), 38 deletions(-) diff --git a/openedx_learning/__init__.py b/openedx_learning/__init__.py index d88e37a3..b602ca30 100644 --- a/openedx_learning/__init__.py +++ b/openedx_learning/__init__.py @@ -1,4 +1,4 @@ """ Open edX Learning ("Learning Core"). """ -__version__ = "0.2.5" +__version__ = "0.2.6" diff --git a/openedx_tagging/core/tagging/api.py b/openedx_tagging/core/tagging/api.py index e9140968..3316b32b 100644 --- a/openedx_tagging/core/tagging/api.py +++ b/openedx_tagging/core/tagging/api.py @@ -135,6 +135,7 @@ def resync_object_tags(object_tags: QuerySet | None = None) -> int: if changed: object_tag.save() num_changed += 1 + return num_changed @@ -315,3 +316,57 @@ def autocomplete_tags( # remove repeats .distinct() ) + + +def add_tag_to_taxonomy( + taxonomy: Taxonomy, + tag: str, + parent_tag_value: str | None = None, + external_id: str | None = None +) -> Tag: + """ + Adds a new Tag to provided Taxonomy. If a Tag already exists in the + Taxonomy, an exception is raised, otherwise the newly created + Tag is returned + """ + taxonomy = taxonomy.cast() + new_tag = taxonomy.add_tag(tag, parent_tag_value, external_id) + + # Resync all related ObjectTags after creating new Tag to + # to ensure any existing ObjectTags with the same value will + # be linked to the new Tag + object_tags = taxonomy.objecttag_set.all() + resync_object_tags(object_tags) + + return new_tag + + +def update_tag_in_taxonomy(taxonomy: Taxonomy, tag: str, new_value: str): + """ + Update a Tag that belongs to a Taxonomy. The related ObjectTags are + updated accordingly. + + Currently only supports updating the Tag value. + """ + taxonomy = taxonomy.cast() + updated_tag = taxonomy.update_tag(tag, new_value) + + # Resync all related ObjectTags to update to the new Tag value + object_tags = taxonomy.objecttag_set.all() + resync_object_tags(object_tags) + + return updated_tag + + +def delete_tags_from_taxonomy( + taxonomy: Taxonomy, + tags: list[str], + with_subtags: bool +): + """ + Delete Tags that belong to a Taxonomy. If any of the Tags have children and + the `with_subtags` is not set to `True` it will fail, otherwise + the sub-tags will be deleted as well. + """ + taxonomy = taxonomy.cast() + taxonomy.delete_tags(tags, with_subtags) diff --git a/openedx_tagging/core/tagging/models/base.py b/openedx_tagging/core/tagging/models/base.py index d24d67d4..4ecf52c5 100644 --- a/openedx_tagging/core/tagging/models/base.py +++ b/openedx_tagging/core/tagging/models/base.py @@ -342,6 +342,105 @@ def get_filtered_tags( return tag_set.order_by("value", "id") + def add_tag( + self, + tag_value: str, + parent_tag_value: str | None = None, + external_id: str | None = None + ) -> Tag: + """ + Add new Tag to Taxonomy. If an existing Tag with the `tag_value` already + exists in the Taxonomy, an exception is raised, otherwise the newly + created Tag is returned + """ + self.check_casted() + + if self.allow_free_text: + raise ValueError( + "add_tag() doesn't work for free text taxonomies. They don't use Tag instances." + ) + + if self.system_defined: + raise ValueError( + "add_tag() doesn't work for system defined taxonomies. They cannot be modified." + ) + + if self.tag_set.filter(value__iexact=tag_value).exists(): + raise ValueError(f"Tag with value '{tag_value}' already exists for taxonomy.") + + parent = None + if parent_tag_value: + # Get parent tag from taxonomy, raises Tag.DoesNotExist if doesn't + # belong to taxonomy + parent = self.tag_set.get(value__iexact=parent_tag_value) + + tag = Tag.objects.create( + taxonomy=self, value=tag_value, parent=parent, external_id=external_id + ) + + return tag + + def update_tag(self, tag: str, new_value: str) -> Tag: + """ + Update an existing Tag in Taxonomy and return it. Currently only + supports updating the Tag's value. + """ + self.check_casted() + + if self.allow_free_text: + raise ValueError( + "update_tag() doesn't work for free text taxonomies. They don't use Tag instances." + ) + + if self.system_defined: + raise ValueError( + "update_tag() doesn't work for system defined taxonomies. They cannot be modified." + ) + + # Update Tag instance with new value, raises Tag.DoesNotExist if + # tag doesn't belong to taxonomy + tag_to_update = self.tag_set.get(value__iexact=tag) + tag_to_update.value = new_value + tag_to_update.save() + return tag_to_update + + def delete_tags(self, tags: List[str], with_subtags: bool = False): + """ + Delete the Taxonomy Tags provided. If any of them have children and + the `with_subtags` is not set to `True` it will fail, otherwise + the sub-tags will be deleted as well. + """ + self.check_casted() + + if self.allow_free_text: + raise ValueError( + "delete_tags() doesn't work for free text taxonomies. They don't use Tag instances." + ) + + if self.system_defined: + raise ValueError( + "delete_tags() doesn't work for system defined taxonomies. They cannot be modified." + ) + + tags_to_delete = self.tag_set.filter(value__in=tags) + + if tags_to_delete.count() != len(tags): + # If they do not match that means there is one or more Tag ID(s) + # provided that do not belong to this Taxonomy + raise ValueError("Invalid tag id provided or tag id does not belong to taxonomy") + + # Check if any Tag contains subtags (children) + contains_children = tags_to_delete.filter(children__isnull=False).distinct().exists() + + if contains_children and not with_subtags: + raise ValueError( + "Tag(s) contain children, `with_subtags` must be `True` for " + "all Tags and their subtags (children) to be deleted." + ) + + # Delete the Tags with their subtags if any + tags_to_delete.delete() + def validate_value(self, value: str) -> bool: """ Check if 'value' is part of this Taxonomy. diff --git a/openedx_tagging/core/tagging/rest_api/v1/permissions.py b/openedx_tagging/core/tagging/rest_api/v1/permissions.py index ed184549..b63a6b7e 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/permissions.py +++ b/openedx_tagging/core/tagging/rest_api/v1/permissions.py @@ -4,6 +4,8 @@ import rules # type: ignore[import] from rest_framework.permissions import DjangoObjectPermissions +from ...models import Tag + class TaxonomyObjectPermissions(DjangoObjectPermissions): """ @@ -35,22 +37,24 @@ class ObjectTagObjectPermissions(DjangoObjectPermissions): } -class TagListPermissions(DjangoObjectPermissions): +class TagObjectPermissions(DjangoObjectPermissions): """ - Permissions for Tag object views. + Maps each REST API methods to its corresponding Tag permission. """ - def has_permission(self, request, view): - """ - Returns True if the user on the given request is allowed the given view. - """ - if not request.user or ( - not request.user.is_authenticated and self.authenticated_users_only - ): - return False - return True + perms_map = { + "GET": ["%(app_label)s.view_%(model_name)s"], + "OPTIONS": [], + "HEAD": ["%(app_label)s.view_%(model_name)s"], + "POST": ["%(app_label)s.add_%(model_name)s"], + "PUT": ["%(app_label)s.change_%(model_name)s"], + "PATCH": ["%(app_label)s.change_%(model_name)s"], + "DELETE": ["%(app_label)s.delete_%(model_name)s"], + } + # This is to handle the special case for GET list of Taxonomy Tags def has_object_permission(self, request, view, obj): """ Returns True if the user on the given request is allowed the given view for the given object. """ + obj = obj.taxonomy if isinstance(obj, Tag) else obj return rules.has_perm("oel_tagging.list_tag", request.user, obj) diff --git a/openedx_tagging/core/tagging/rest_api/v1/serializers.py b/openedx_tagging/core/tagging/rest_api/v1/serializers.py index a4eb89ff..c34c15da 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/serializers.py +++ b/openedx_tagging/core/tagging/rest_api/v1/serializers.py @@ -1,7 +1,6 @@ """ API Serializers for taxonomies """ - from rest_framework import serializers from rest_framework.reverse import reverse @@ -110,6 +109,7 @@ class Meta: "value", "taxonomy_id", "parent_id", + "external_id", "sub_tags_link", "children_count", ) @@ -120,11 +120,12 @@ def get_sub_tags_link(self, obj): """ if obj.children.count(): query_params = f"?parent_tag_id={obj.id}" + request = self.context.get("request") + url_namespace = request.resolver_match.namespace # get the namespace, usually "oel_tagging" url = ( - reverse("oel_tagging:taxonomy-tags", args=[str(obj.taxonomy_id)]) + reverse(f"{url_namespace}:taxonomy-tags", args=[str(obj.taxonomy_id)]) + query_params ) - request = self.context.get("request") return request.build_absolute_uri(url) return None @@ -192,3 +193,33 @@ def get_children_count(self, obj): Returns the number of child tags of the given tag. """ return len(obj.sub_tags) + + +class TaxonomyTagCreateBodySerializer(serializers.Serializer): # pylint: disable=abstract-method + """ + Serializer of the body for the Taxonomy Tags CREATE request + """ + + tag = serializers.CharField(required=True) + parent_tag_value = serializers.CharField(required=False) + external_id = serializers.CharField(required=False) + + +class TaxonomyTagUpdateBodySerializer(serializers.Serializer): # pylint: disable=abstract-method + """ + Serializer of the body for the Taxonomy Tags UPDATE request + """ + + tag = serializers.CharField(required=True) + updated_tag_value = serializers.CharField(required=True) + + +class TaxonomyTagDeleteBodySerializer(serializers.Serializer): # pylint: disable=abstract-method + """ + Serializer of the body for the Taxonomy Tags DELETE request + """ + + tags = serializers.ListField( + child=serializers.CharField(), required=True + ) + with_subtags = serializers.BooleanField(required=False) diff --git a/openedx_tagging/core/tagging/rest_api/v1/views.py b/openedx_tagging/core/tagging/rest_api/v1/views.py index 743f5bcd..6428043c 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/views.py +++ b/openedx_tagging/core/tagging/rest_api/v1/views.py @@ -5,17 +5,20 @@ from django.db import models from django.http import Http404, HttpResponse -from rest_framework import mixins +from rest_framework import mixins, status from rest_framework.decorators import action from rest_framework.exceptions import MethodNotAllowed, PermissionDenied, ValidationError -from rest_framework.generics import ListAPIView +from rest_framework.generics import ListAPIView, RetrieveUpdateDestroyAPIView from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet, ModelViewSet from openedx_tagging.core.tagging.models.base import Tag from ...api import ( + TagDoesNotExist, + add_tag_to_taxonomy, create_taxonomy, + delete_tags_from_taxonomy, get_children_tags, get_object_tags, get_root_tags, @@ -23,13 +26,14 @@ get_taxonomy, search_tags, tag_object, + update_tag_in_taxonomy, ) from ...import_export.api import export_tags from ...import_export.parsers import ParserFormat from ...models import Taxonomy from ...rules import ObjectTagPermissionItem from ..paginators import SEARCH_TAGS_THRESHOLD, TAGS_THRESHOLD, DisabledTagsPagination, TagsPagination -from .permissions import ObjectTagObjectPermissions, TagListPermissions, TaxonomyObjectPermissions +from .permissions import ObjectTagObjectPermissions, TagObjectPermissions, TaxonomyObjectPermissions from .serializers import ( ObjectTagListQueryParamsSerializer, ObjectTagSerializer, @@ -41,6 +45,9 @@ TaxonomyExportQueryParamsSerializer, TaxonomyListQueryParamsSerializer, TaxonomySerializer, + TaxonomyTagCreateBodySerializer, + TaxonomyTagDeleteBodySerializer, + TaxonomyTagUpdateBodySerializer, ) from .utils import view_auth_classes @@ -395,7 +402,7 @@ def update(self, request, *args, **kwargs) -> Response: tags = body.data.get("tags", []) try: tag_object(taxonomy, tags, object_id) - except Tag.DoesNotExist as e: + except TagDoesNotExist as e: raise ValidationError from e except ValueError as e: raise ValidationError from e @@ -404,28 +411,96 @@ def update(self, request, *args, **kwargs) -> Response: @view_auth_classes -class TaxonomyTagsView(ListAPIView): +class TaxonomyTagsView(ListAPIView, RetrieveUpdateDestroyAPIView): """ - View to list tags of a taxonomy. + View to list/create/update/delete tags of a taxonomy. **List Query Parameters** - * pk (required) - The pk of the taxonomy to retrieve tags. + * id (required) - The ID of the taxonomy to retrieve tags. * parent_tag_id (optional) - Id of the tag to retrieve children tags. * page (optional) - Page number (default: 1) * page_size (optional) - Number of items per page (default: 10) **List Example Requests** - GET api/tagging/v1/taxonomy/:pk/tags - Get tags of taxonomy - GET api/tagging/v1/taxonomy/:pk/tags?parent_tag_id=30 - Get children tags of tag + GET api/tagging/v1/taxonomy/:id/tags - Get tags of taxonomy + GET api/tagging/v1/taxonomy/:id/tags?parent_tag_id=30 - Get children tags of tag **List Query Returns** * 200 - Success * 400 - Invalid query parameter * 403 - Permission denied * 404 - Taxonomy not found + + **Create Query Parameters** + * id (required) - The ID of the taxonomy to create a Tag for + + **Create Request Body** + * tag (required): The value of the Tag that should be added to + the Taxonomy + * parent_tag_value (optional): The value of the parent tag that the new + Tag should fall under + * extenal_id (optional): The external id for the new Tag + + **Create Example Requests** + POST api/tagging/v1/taxonomy/:id/tags - Create a Tag in taxonomy + { + "value": "New Tag", + "parent_tag_value": "Parent Tag" + "external_id": "abc123", + } + + **Create Query Returns** + * 201 - Success + * 400 - Invalid parameters provided + * 403 - Permission denied + * 404 - Taxonomy not found + + **Update Query Parameters** + * id (required) - The ID of the taxonomy to update a Tag in + + **Update Request Body** + * tag (required): The value (identifier) of the Tag to be updated + * updated_tag_value (required): The updated value of the Tag + + **Update Example Requests** + PATCH api/tagging/v1/taxonomy/:id/tags - Update a Tag in Taxonomy + { + "tag": "Tag 1", + "updated_tag_value": "Updated Tag Value" + } + + **Update Query Returns** + * 200 - Success + * 400 - Invalid parameters provided + * 403 - Permission denied + * 404 - Taxonomy, Tag or Parent Tag not found + + **Delete Query Parameters** + * id (required) - The ID of the taxonomy to Delete Tag(s) in + + **Delete Request Body** + * tags (required): The values (identifiers) of Tags that should be + deleted from Taxonomy + * with_subtags (optional): If a Tag in the provided ids contains + children (subtags), deletion will fail unless + set to `True`. Defaults to `False`. + + **Delete Example Requests** + DELETE api/tagging/v1/taxonomy/:id/tags - Delete Tag(s) in Taxonomy + { + "tags": ["Tag 1", "Tag 2", "Tag 3"], + "with_subtags": True + } + + **Delete Query Returns** + * 200 - Success + * 400 - Invalid parameters provided + * 403 - Permission denied + * 404 - Taxonomy not found + """ - permission_classes = [TagListPermissions] + permission_classes = [TagObjectPermissions] pagination_enabled = True def __init__(self): @@ -558,7 +633,7 @@ def get_matching_tags( return result - def get_queryset(self) -> list[Tag]: # type: ignore[override] + def get_queryset(self) -> models.QuerySet[Tag]: # type: ignore[override] """ Builds and returns the queryset to be paginated. @@ -576,7 +651,103 @@ def get_queryset(self) -> list[Tag]: # type: ignore[override] search_term=search_term, ) + # Convert the results back to a QuerySet for permissions to apply + # Due to the conversion we lose the populated `sub_tags` attribute, + # in the case of using the special search serializer so we + # need to repopulate it again + if self.serializer_class == TagsForSearchSerializer: + results_dict = {tag.id: tag for tag in result} + + result_queryset = Tag.objects.filter(id__in=results_dict.keys()) + + for tag in result_queryset: + sub_tags = results_dict[tag.id].sub_tags # type: ignore[attr-defined] + tag.sub_tags = sub_tags # type: ignore[attr-defined] + + else: + result_queryset = Tag.objects.filter(id__in=[tag.id for tag in result]) + # This function is not called automatically self.pagination_class = self.get_pagination_class() - return result + return result_queryset + + def post(self, request, *args, **kwargs): + """ + Creates new Tag in Taxonomy and returns the newly created Tag. + """ + pk = self.kwargs.get("pk") + taxonomy = self.get_taxonomy(pk) + + body = TaxonomyTagCreateBodySerializer(data=request.data) + body.is_valid(raise_exception=True) + + tag = body.data.get("tag") + parent_tag_value = body.data.get("parent_tag_value", None) + external_id = body.data.get("external_id", None) + + try: + new_tag = add_tag_to_taxonomy( + taxonomy, tag, parent_tag_value, external_id + ) + except TagDoesNotExist as e: + raise Http404("Parent Tag not found") from e + except ValueError as e: + raise ValidationError(e) from e + + self.serializer_class = TagsSerializer + serializer_context = self.get_serializer_context() + return Response( + self.serializer_class(new_tag, context=serializer_context).data, + status=status.HTTP_201_CREATED + ) + + def update(self, request, *args, **kwargs): + """ + Updates a Tag that belongs to the Taxonomy and returns it. + Currently only updating the Tag value is supported. + """ + pk = self.kwargs.get("pk") + taxonomy = self.get_taxonomy(pk) + + body = TaxonomyTagUpdateBodySerializer(data=request.data) + body.is_valid(raise_exception=True) + + tag = body.data.get("tag") + updated_tag_value = body.data.get("updated_tag_value") + + try: + updated_tag = update_tag_in_taxonomy(taxonomy, tag, updated_tag_value) + except TagDoesNotExist as e: + raise Http404("Tag not found") from e + except ValueError as e: + raise ValidationError(e) from e + + self.serializer_class = TagsSerializer + serializer_context = self.get_serializer_context() + return Response( + self.serializer_class(updated_tag, context=serializer_context).data, + status=status.HTTP_200_OK + ) + + def delete(self, request, *args, **kwargs): + """ + Deletes Tag(s) in Taxonomy. If any of the Tags have children and + the `with_subtags` is not set to `True` it will fail, otherwise + the sub-tags will be deleted as well. + """ + pk = self.kwargs.get("pk") + taxonomy = self.get_taxonomy(pk) + + body = TaxonomyTagDeleteBodySerializer(data=request.data) + body.is_valid(raise_exception=True) + + tags = body.data.get("tags") + with_subtags = body.data.get("with_subtags") + + try: + delete_tags_from_taxonomy(taxonomy, tags, with_subtags) + except ValueError as e: + raise ValidationError(e) from e + + return Response(status=status.HTTP_200_OK) diff --git a/openedx_tagging/core/tagging/rules.py b/openedx_tagging/core/tagging/rules.py index 8249628f..97ca56de 100644 --- a/openedx_tagging/core/tagging/rules.py +++ b/openedx_tagging/core/tagging/rules.py @@ -51,17 +51,27 @@ def can_change_taxonomy(user: UserType, taxonomy: Taxonomy | None = None) -> boo ) +@rules.predicate +def can_view_tag(user: UserType, tag: Tag | None = None) -> bool: + """ + User can view tags for any taxonomy they can view. + """ + taxonomy = tag.taxonomy.cast() if (tag and tag.taxonomy) else None + return user.has_perm( + "oel_tagging.view_taxonomy", + taxonomy, + ) + + @rules.predicate def can_change_tag(user: UserType, tag: Tag | None = None) -> bool: """ - Even taxonomy admins cannot add tags to system taxonomies (their tags are system-defined), or free-text taxonomies - (these don't have predefined tags). + Users can change tags for any taxonomy they can modify. """ taxonomy = tag.taxonomy.cast() if (tag and tag.taxonomy) else None - return is_taxonomy_admin(user) and ( - not tag - or not taxonomy - or (taxonomy and not taxonomy.allow_free_text and not taxonomy.system_defined) + return user.has_perm( + "oel_tagging.change_taxonomy", + taxonomy, ) @@ -166,8 +176,10 @@ def can_change_object_tag( # Tag rules.add_perm("oel_tagging.add_tag", can_change_tag) rules.add_perm("oel_tagging.change_tag", can_change_tag) -rules.add_perm("oel_tagging.delete_tag", is_taxonomy_admin) -rules.add_perm("oel_tagging.view_tag", rules.always_allow) +rules.add_perm("oel_tagging.delete_tag", can_change_tag) +rules.add_perm("oel_tagging.view_tag", can_view_tag) +# Special Case for listing Tags, we check if we can view the Taxonomy since +# that is what is passed in rather than a Tag object rules.add_perm("oel_tagging.list_tag", can_view_taxonomy) # ObjectTag diff --git a/tests/openedx_tagging/core/tagging/test_rules.py b/tests/openedx_tagging/core/tagging/test_rules.py index f30cbd27..11cc2ced 100644 --- a/tests/openedx_tagging/core/tagging/test_rules.py +++ b/tests/openedx_tagging/core/tagging/test_rules.py @@ -141,12 +141,12 @@ def test_add_change_tag(self, perm): ) def test_tag_free_text_taxonomy(self, perm): """ - Taxonomy administrators cannot modify tags on a free-text Taxonomy + Taxonomy administrators can modify any Tag, even those associated with a free-text Taxonomy """ self.taxonomy.allow_free_text = True self.taxonomy.save() assert self.superuser.has_perm(perm, self.bacteria) - assert not self.staff.has_perm(perm, self.bacteria) + assert self.staff.has_perm(perm, self.bacteria) assert not self.learner.has_perm(perm, self.bacteria) @ddt.data( diff --git a/tests/openedx_tagging/core/tagging/test_views.py b/tests/openedx_tagging/core/tagging/test_views.py index d97296ba..b52c0e64 100644 --- a/tests/openedx_tagging/core/tagging/test_views.py +++ b/tests/openedx_tagging/core/tagging/test_views.py @@ -897,7 +897,7 @@ def test_tag_object_count_limit(self): class TestTaxonomyTagsView(TestTaxonomyViewMixin): """ - Tests the list tags of taxonomy view + Tests the list/create/update/delete tags of taxonomy view """ fixtures = ["tests/openedx_tagging/core/fixtures/tagging.yaml"] @@ -1209,3 +1209,543 @@ def test_next_children(self): assert data.get("count") == self.children_tags_count[0] assert data.get("num_pages") == 2 assert data.get("current_page") == 2 + + def test_create_tag_in_taxonomy_while_loggedout(self): + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_create_tag_in_taxonomy_without_permission(self): + self.client.force_authenticate(user=self.user) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_create_tag_in_taxonomy(self): + self.client.force_authenticate(user=self.staff) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_201_CREATED + + data = response.data + + self.assertIsNotNone(data.get("id")) + self.assertEqual(data.get("value"), new_tag_value) + self.assertEqual(data.get("taxonomy_id"), self.small_taxonomy.pk) + self.assertIsNone(data.get("parent_id")) + self.assertIsNone(data.get("external_id")) + self.assertIsNone(data.get("sub_tags_link")) + self.assertEqual(data.get("children_count"), 0) + + def test_create_tag_in_taxonomy_with_parent(self): + self.client.force_authenticate(user=self.staff) + parent_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + new_tag_value = "New Child Tag" + new_external_id = "extId" + + create_data = { + "tag": new_tag_value, + "parent_tag_value": parent_tag.value, + "external_id": new_external_id + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_201_CREATED + + data = response.data + + self.assertIsNotNone(data.get("id")) + self.assertEqual(data.get("value"), new_tag_value) + self.assertEqual(data.get("taxonomy_id"), self.small_taxonomy.pk) + self.assertEqual(data.get("parent_id"), parent_tag.id) + self.assertEqual(data.get("external_id"), new_external_id) + self.assertIsNone(data.get("sub_tags_link")) + self.assertEqual(data.get("children_count"), 0) + + def test_create_tag_in_invalid_taxonomy(self): + self.client.force_authenticate(user=self.staff) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + invalid_taxonomy_url = TAXONOMY_TAGS_URL.format(pk=919191) + response = self.client.post( + invalid_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_create_tag_in_free_text_taxonomy(self): + self.client.force_authenticate(user=self.staff) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + # Setting free text flag on taxonomy + self.small_taxonomy.allow_free_text = True + self.small_taxonomy.save() + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_create_tag_in_system_defined_taxonomy(self): + self.client.force_authenticate(user=self.staff) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + # Setting taxonomy to be system defined + self.small_taxonomy.taxonomy_class = SystemDefinedTaxonomy + self.small_taxonomy.save() + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_create_tag_in_taxonomy_with_invalid_parent_tag(self): + self.client.force_authenticate(user=self.staff) + invalid_parent_tag = "Invalid Tag" + new_tag_value = "New Child Tag" + + create_data = { + "tag": new_tag_value, + "parent_tag_value": invalid_parent_tag, + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_create_tag_in_taxonomy_with_parent_tag_in_other_taxonomy(self): + self.client.force_authenticate(user=self.staff) + tag_in_other_taxonomy = Tag.objects.get(id=1) + new_tag_value = "New Child Tag" + + create_data = { + "tag": new_tag_value, + "parent_tag_value": tag_in_other_taxonomy.value, + } + + response = self.client.post( + self.large_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_create_tag_in_taxonomy_with_already_existing_value(self): + self.client.force_authenticate(user=self.staff) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_201_CREATED + + # Make request again with the same Tag value after it was created + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_update_tag_in_taxonomy_while_loggedout(self): + updated_tag_value = "Updated Tag" + + # Existing Tag that will be updated + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + update_data = { + "tag": existing_tag.value, + "updated_tag_value": updated_tag_value + } + + # Test updating using the PUT method + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_update_tag_in_taxonomy_without_permission(self): + self.client.force_authenticate(user=self.user) + updated_tag_value = "Updated Tag" + + # Existing Tag that will be updated + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + update_data = { + "tag": existing_tag.value, + "updated_tag_value": updated_tag_value + } + + # Test updating using the PUT method + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_update_tag_in_taxonomy_with_different_methods(self): + self.client.force_authenticate(user=self.staff) + updated_tag_value = "Updated Tag" + updated_tag_value_2 = "Updated Tag 2" + + # Existing Tag that will be updated + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + update_data = { + "tag": existing_tag.value, + "updated_tag_value": updated_tag_value + } + + # Test updating using the PUT method + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_200_OK + + data = response.data + + # Check that Tag value got updated + self.assertEqual(data.get("id"), existing_tag.id) + self.assertEqual(data.get("value"), updated_tag_value) + self.assertEqual(data.get("taxonomy_id"), self.small_taxonomy.pk) + self.assertEqual(data.get("parent_id"), existing_tag.parent) + self.assertEqual(data.get("external_id"), existing_tag.external_id) + + # Test updating using the PATCH method + update_data["tag"] = updated_tag_value # Since the value changed + update_data["updated_tag_value"] = updated_tag_value_2 + response = self.client.patch( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_200_OK + + data = response.data + + # Check the Tag value got updated again + self.assertEqual(data.get("id"), existing_tag.id) + self.assertEqual(data.get("value"), updated_tag_value_2) + self.assertEqual(data.get("taxonomy_id"), self.small_taxonomy.pk) + self.assertEqual(data.get("parent_id"), existing_tag.parent) + self.assertEqual(data.get("external_id"), existing_tag.external_id) + + def test_update_tag_in_taxonomy_reflects_changes_in_object_tags(self): + self.client.force_authenticate(user=self.staff) + + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + # Setup ObjectTags + # _value=existing_tag.value + object_tag_1 = ObjectTag.objects.create( + object_id="abc", taxonomy=self.small_taxonomy, tag=existing_tag + ) + object_tag_2 = ObjectTag.objects.create( + object_id="def", taxonomy=self.small_taxonomy, tag=existing_tag + ) + object_tag_3 = ObjectTag.objects.create( + object_id="ghi", taxonomy=self.small_taxonomy, tag=existing_tag + ) + + assert object_tag_1.value == existing_tag.value + assert object_tag_2.value == existing_tag.value + assert object_tag_3.value == existing_tag.value + + updated_tag_value = "Updated Tag" + update_data = { + "tag": existing_tag.value, + "updated_tag_value": updated_tag_value + } + + # Test updating using the PUT method + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_200_OK + + data = response.data + + # Check that Tag value got updated + self.assertEqual(data.get("id"), existing_tag.id) + self.assertEqual(data.get("value"), updated_tag_value) + self.assertEqual(data.get("taxonomy_id"), self.small_taxonomy.pk) + self.assertEqual(data.get("parent_id"), existing_tag.parent) + self.assertEqual(data.get("external_id"), existing_tag.external_id) + + # Check that the ObjectTags got updated as well + object_tag_1.refresh_from_db() + self.assertEqual(object_tag_1.value, updated_tag_value) + object_tag_2.refresh_from_db() + self.assertEqual(object_tag_2.value, updated_tag_value) + object_tag_3.refresh_from_db() + self.assertEqual(object_tag_3.value, updated_tag_value) + + def test_update_tag_in_taxonomy_with_invalid_tag(self): + self.client.force_authenticate(user=self.staff) + updated_tag_value = "Updated Tag" + + update_data = { + "tag": 919191, + "updated_tag_value": updated_tag_value + } + + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_update_tag_in_taxonomy_with_tag_in_other_taxonomy(self): + self.client.force_authenticate(user=self.staff) + updated_tag_value = "Updated Tag" + tag_in_other_taxonomy = Tag.objects.get(id=1) + + update_data = { + "tag": tag_in_other_taxonomy.value, + "updated_tag_value": updated_tag_value + } + + response = self.client.put( + self.large_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_update_tag_in_taxonomy_with_no_tag_value_provided(self): + self.client.force_authenticate(user=self.staff) + + # Existing Tag that will be updated + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + update_data = { + "tag": existing_tag.value + } + + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_update_tag_in_invalid_taxonomy(self): + self.client.force_authenticate(user=self.staff) + + # Existing Tag that will be updated + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + updated_tag_value = "Updated Tag" + update_data = { + "tag": existing_tag.value, + "updated_tag_value": updated_tag_value + } + + invalid_taxonomy_url = TAXONOMY_TAGS_URL.format(pk=919191) + response = self.client.put( + invalid_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_delete_single_tag_from_taxonomy_while_loggedout(self): + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tags": [existing_tag.value], + "with_subtags": True + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_delete_single_tag_from_taxonomy_without_permission(self): + self.client.force_authenticate(user=self.user) + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tags": [existing_tag.value], + "with_subtags": True + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete_single_tag_from_taxonomy(self): + self.client.force_authenticate(user=self.staff) + + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tags": [existing_tag.value], + "with_subtags": True + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_200_OK + + # Check that Tag no longer exists + with self.assertRaises(Tag.DoesNotExist): + existing_tag.refresh_from_db() + + def test_delete_multiple_tags_from_taxonomy(self): + self.client.force_authenticate(user=self.staff) + + # Get Tags that will be deleted + existing_tags = self.small_taxonomy.tag_set.filter(parent=None)[:3] + + delete_data = { + "tags": [existing_tag.value for existing_tag in existing_tags], + "with_subtags": True + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_200_OK + + # Check that Tags no longer exists + for existing_tag in existing_tags: + with self.assertRaises(Tag.DoesNotExist): + existing_tag.refresh_from_db() + + def test_delete_tag_with_subtags_should_fail_without_flag_passed(self): + self.client.force_authenticate(user=self.staff) + + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tags": [existing_tag.value] + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_delete_tag_in_invalid_taxonomy(self): + self.client.force_authenticate(user=self.staff) + + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tags": [existing_tag.value] + } + + invalid_taxonomy_url = TAXONOMY_TAGS_URL.format(pk=919191) + response = self.client.delete( + invalid_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_delete_tag_in_taxonomy_with_invalid_tag(self): + self.client.force_authenticate(user=self.staff) + + delete_data = { + "tags": ["Invalid Tag"] + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_delete_tag_with_tag_in_other_taxonomy(self): + self.client.force_authenticate(user=self.staff) + + # Get Tag in other Taxonomy + tag_in_other_taxonomy = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tags": [tag_in_other_taxonomy.value] + } + + response = self.client.delete( + self.large_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_delete_tag_in_taxonomy_without_subtags(self): + self.client.force_authenticate(user=self.staff) + + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(children__isnull=True).first() + + delete_data = { + "tags": [existing_tag.value] + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_200_OK + + # Check that Tag no longer exists + with self.assertRaises(Tag.DoesNotExist): + existing_tag.refresh_from_db()