Skip to content

Commit

Permalink
feat: export tagged course as csv
Browse files Browse the repository at this point in the history
  • Loading branch information
rpenido committed Jan 25, 2024
1 parent 5838d68 commit 52bc665
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 9 deletions.
159 changes: 156 additions & 3 deletions openedx/core/djangoapps/content_tagging/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@
Content Tagging APIs
"""
from __future__ import annotations
from typing import TYPE_CHECKING

import csv
from itertools import groupby
from io import StringIO

import openedx_tagging.core.tagging.api as oel_tagging
from django.db.models import Q, QuerySet, Exists, OuterRef
from openedx_tagging.core.tagging.models import Taxonomy
from organizations.models import Organization
from opaque_keys.edx.keys import CourseKey, UsageKey
from openedx_tagging.core.tagging.models import ObjectTag

from xmodule.modulestore.django import modulestore

from .models import ContentObjectTag, TaxonomyOrg
from .types import ContentKey

if TYPE_CHECKING:
from openedx_tagging.core.tagging.models import Taxonomy
from xblock.runtime import Runtime
from organizations.models import Organization
from .types import ContentKey


def create_taxonomy(
Expand Down Expand Up @@ -141,6 +153,8 @@ def get_content_tags(
)


# FixMe: The following method (tag_content_object) is only used in tasks.py for auto-tagging. To tag object we are
# using oel_tagging.tag_object and checking permissions via rule overrides.
def tag_content_object(
object_key: ContentKey,
taxonomy: Taxonomy,
Expand Down Expand Up @@ -175,6 +189,145 @@ def tag_content_object(
return get_content_tags(object_key, taxonomy_id=taxonomy.id)


def export_content_object_children_tags(
course_key_str: str,
) -> str:
"""
Generates a CSV file with the tags for all the children of a course.
"""
def _get_course_children_tags(course_key: CourseKey) -> tuple[dict[str, dict[int, list[str]]], dict[int, str]]:
"""
Returns a tuple with a dictionary of object tags for all blocks of a course,
grouping by the block id and taxonomy id; and a dictionary of taxonomy ids and names.
I.e.
// result
{
// Block with id block-v1:edX+DemoX+Demo_Course+type@chapter+block@chapter
"block-v1:edX+DemoX+Demo_Course+type@chapter+block@chapter": {
// ObjectTags from Taxonomy with id 1
"1": (
"Tag1",
"Tag2",
...
),
// ObjectTags from Taxonomy with id 2
"2": (
"Tag3",
...
),
...
},
// Block with id block-v1:edX+DemoX+Demo_Course+type@sequential+block@sequential
"block-v1:edX+DemoX+Demo_Course+type@sequential+block@sequential": {
// ObjectTags from Taxonomy with id 1
"1": (
"Tag2",
...
),
...
},
}
// taxonomies
{
"1": "Taxonomy A",
"2": "Taxonomy B",
...
}
"""
block_id_prefix = str(course_key).replace("course-v1:", "block-v1:", 1)
block_tags_records = ObjectTag.objects.filter(object_id__startswith=block_id_prefix).all()

result: dict[str, dict[int, list[str]]] = {}
taxonomies: dict[int, str] = {}

for object_id, block_tags in groupby(block_tags_records, lambda x: x.object_id):
result[object_id] = {}
for taxonomy_id, taxonomy_tags in groupby(block_tags, lambda x: x.tag.taxonomy_id):
object_tag_list = list(taxonomy_tags)
result[object_id][taxonomy_id] = [
# If the tag is not found (deleted or freeText), use the objecttag._name instead
objecttag.tag.value if objecttag.tag else objecttag.name
for objecttag in object_tag_list
]

if taxonomy_id not in taxonomies:
taxonomies[taxonomy_id] = object_tag_list[0].tag.taxonomy.name

return result, taxonomies

def _generate_csv(
header: dict[str, str],
blocks: list[tuple[int, UsageKey]],
tags: dict[str, dict[int, list[str]]],
taxonomies: dict[int, str],
runtime: Runtime,
) -> str:
"""
Receives the blocks, tags and taxonomies and returns a CSV string
"""

with StringIO() as csv_buffer:
csv_writer = csv.DictWriter(csv_buffer, fieldnames=header.keys())
csv_writer.writerow(header)

# Iterate over the blocks stack and write the block rows
while blocks:
level, block_id = blocks.pop()
# ToDo: fix block typing
block = runtime.get_block(block_id)

block_data = {
"name": level * " " + block.display_name_with_default,
"type": block.category,
"id": block_id
}

block_id_str = str(block_id)

# Add the tags for each taxonomy
for taxonomy_id in taxonomies:
if block_id_str in tags and taxonomy_id in tags[block_id_str]:
block_data[f"taxonomy_{taxonomy_id}"] = ", ".join(tags[block_id_str][taxonomy_id])

csv_writer.writerow(block_data)

# Add children to the stack
if block.has_children:
for child_id in block.children:
blocks.append((level + 1, child_id))

return csv_buffer.getvalue()

store = modulestore()
course_key = CourseKey.from_string(course_key_str)
if not course_key.is_course:
raise ValueError(f"Invalid course key {course_key_str}")

# ToDo: fix course typing
course = store.get_course(course_key)
if course is None:
raise ValueError(f"Course {course_key} not found")

tags, taxonomies = _get_course_children_tags(course_key)

blocks = []
# Add children to the stack
if course.has_children:
for child_id in course.children:
blocks.append((0, child_id))

header = {"name": "Name", "type": "Type", "id": "ID"}

# Prepare the header for the taxonomies
# We are using the taxonomy id as the field name to avoid collisions
for taxonomy_id, name in taxonomies.items():
header[f"taxonomy_{taxonomy_id}"] = name

return _generate_csv(header, blocks, tags, taxonomies, course.runtime)


# Expose the oel_tagging APIs

get_taxonomy = oel_tagging.get_taxonomy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,10 @@ class Meta:
model = TaxonomySerializer.Meta.model
fields = TaxonomySerializer.Meta.fields + ["orgs", "all_orgs"]
read_only_fields = ["orgs", "all_orgs"]


class ContentObjectChildrenTagsExportQueryParamsSerializer(serializers.Serializer): # pylint: disable=abstract-method
"""
Serializer for the query params for the export objecttags GET view
"""
download = serializers.BooleanField(required=False, default=False)
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@
from openedx.core.djangolib.testing.utils import skip_unless_cms
from openedx.core.lib import blockstore_api

from ....tests.test_api import TaggedCourseMixin

User = get_user_model()

TAXONOMY_ORG_LIST_URL = "/api/content_tagging/v1/taxonomies/"
TAXONOMY_ORG_DETAIL_URL = "/api/content_tagging/v1/taxonomies/{pk}/"
TAXONOMY_ORG_UPDATE_ORG_URL = "/api/content_tagging/v1/taxonomies/{pk}/orgs/"
OBJECT_TAG_UPDATE_URL = "/api/content_tagging/v1/object_tags/{object_id}/?taxonomy={taxonomy_id}"
OBJECT_TAGS_EXPORT_URL = "/api/content_tagging/v1/object_tags/{object_id}/export/"
OBJECT_TAGS_URL = "/api/content_tagging/v1/object_tags/{object_id}/"
TAXONOMY_TEMPLATE_URL = "/api/content_tagging/v1/taxonomies/import/{filename}"
TAXONOMY_CREATE_IMPORT_URL = "/api/content_tagging/v1/taxonomies/import/"
Expand Down Expand Up @@ -1624,6 +1627,63 @@ def test_object_tags_query_count(self):
assert response.data[object_id]["taxonomies"][0]["tags"] == expected_tags


@skip_unless_cms
@ddt.ddt
class TestContentObjectChildrenExportView(TaggedCourseMixin, APITestCase): # type: ignore[misc]
"""
Tests exporting course children with tags
"""
def setUp(self):
super().setUp()
self.user = User.objects.create(
username="user",
email="[email protected]",
)
self.staff = User.objects.create(
username="staff",
email="[email protected]",
is_staff=True,
)

self.staffA = User.objects.create(
username="staffA",
email="[email protected]",
)
update_org_role(self.staff, OrgStaffRole, self.staffA, [self.orgA.short_name])

@ddt.data(
"staff",
"staffA",
)
def test_export_course(self, user_attr) -> None:
url = OBJECT_TAGS_EXPORT_URL.format(object_id=str(self.course.id))

user = getattr(self, user_attr)
self.client.force_authenticate(user=user)
response = self.client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response.headers['Content-Type'] == 'text'
assert int(response.headers['Content-Length']) > 0
assert response.content == self.expected_csv.encode("utf-8")

def test_export_course_anoymous_unauthorized(self) -> None:
url = OBJECT_TAGS_EXPORT_URL.format(object_id=str(self.course.id))
response = self.client.get(url)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

def test_export_course_user_forbidden(self) -> None:
url = OBJECT_TAGS_EXPORT_URL.format(object_id=str(self.course.id))
self.client.force_authenticate(user=self.user)
response = self.client.get(url)
assert response.status_code == status.HTTP_403_FORBIDDEN

def test_export_course_invalid_id(self) -> None:
url = OBJECT_TAGS_EXPORT_URL.format(object_id="invalid")
self.client.force_authenticate(user=self.staff)
response = self.client.get(url)
assert response.status_code == status.HTTP_400_BAD_REQUEST


@skip_unless_cms
@ddt.ddt
class TestDownloadTemplateView(APITestCase):
Expand All @@ -1635,20 +1695,20 @@ class TestDownloadTemplateView(APITestCase):
("template.json", "application/json"),
)
@ddt.unpack
def test_download(self, filename, content_type):
def test_download(self, filename, content_type) -> None:
url = TAXONOMY_TEMPLATE_URL.format(filename=filename)
response = self.client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response.headers['Content-Type'] == content_type
assert response.headers['Content-Disposition'] == f'attachment; filename="{filename}"'
assert int(response.headers['Content-Length']) > 0

def test_download_not_found(self):
def test_download_not_found(self) -> None:
url = TAXONOMY_TEMPLATE_URL.format(filename="template.txt")
response = self.client.get(url)
assert response.status_code == status.HTTP_404_NOT_FOUND

def test_download_method_not_allowed(self):
def test_download_method_not_allowed(self) -> None:
url = TAXONOMY_TEMPLATE_URL.format(filename="template.txt")
response = self.client.post(url)
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
Expand Down
62 changes: 60 additions & 2 deletions openedx/core/djangoapps/content_tagging/rest_api/v1/views.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""
Tagging Org API Views
"""
from django.db.models.query import QuerySet
from django.http import HttpResponse
from openedx_tagging.core.tagging import rules as oel_tagging_rules
from openedx_tagging.core.tagging.models import ObjectTag
from openedx_tagging.core.tagging.rest_api.v1.views import ObjectTagView, TaxonomyView
from rest_framework import status
from rest_framework.decorators import action
Expand All @@ -11,14 +14,20 @@

from ...api import (
create_taxonomy,
export_content_object_children_tags,
get_taxonomy,
get_taxonomies,
get_taxonomies_for_org,
get_unassigned_taxonomies,
set_taxonomy_orgs,
)
from ...rules import get_admin_orgs
from .serializers import TaxonomyOrgListQueryParamsSerializer, TaxonomyOrgSerializer, TaxonomyUpdateOrgBodySerializer
from .serializers import (
ContentObjectChildrenTagsExportQueryParamsSerializer,
TaxonomyOrgListQueryParamsSerializer,
TaxonomyOrgSerializer,
TaxonomyUpdateOrgBodySerializer,
)
from .filters import ObjectTagTaxonomyOrgFilterBackend, UserOrgFilterBackend


Expand Down Expand Up @@ -130,8 +139,57 @@ def orgs(self, request, **_kwargs) -> Response:
class ObjectTagOrgView(ObjectTagView):
"""
View to create and retrieve ObjectTags for a provided Object ID (object_id).
This view extends the ObjectTagView to add Organization filters for the results.
This view extends the ObjectTagView to add Organization filters for the results and
new actions like: export.
Refer to ObjectTagView docstring for usage details.
"""
filter_backends = [ObjectTagTaxonomyOrgFilterBackend]

def get_queryset(self):
if self.action == "retrieve":
return super().get_queryset()

# For other actions, return a dummy queryset only for permission checking
dummy_queryset = QuerySet(model=ObjectTag)

return dummy_queryset

@action(detail=True, url_path="export", methods=["get"])
def export_children_object_tags(self, request: Request, **kwargs) -> HttpResponse:
"""
Export all the object tags for the given object_id children.
"""
object_id: str = kwargs.get('object_id', None)

query_params = ContentObjectChildrenTagsExportQueryParamsSerializer(
data=request.query_params.dict()
)
query_params.is_valid(raise_exception=True)

# Check if the user has permission to view object tags for this object_id
try:
if not self.request.user.has_perm(
"oel_tagging.view_objecttag",
# The obj arg expects a model, but we are passing an object
oel_tagging_rules.ObjectTagPermissionItem(taxonomy=None, object_id=object_id), # type: ignore[arg-type]
):
raise PermissionDenied(
"You do not have permission to view object tags for this object_id."
)
except ValueError as e:
raise ValidationError from e

if query_params.data.get("download"):
content_type = "text/csv"
else:
content_type = "text"

tags = export_content_object_children_tags(object_id)

if query_params.data.get("download"):
response = HttpResponse(tags.encode('utf-8'), content_type=content_type)
response["Content-Disposition"] = f'attachment; filename="{object_id}_tags.csv"'
return response

return HttpResponse(tags, content_type=content_type)
Loading

0 comments on commit 52bc665

Please sign in to comment.