diff --git a/app/general/admin.py b/app/general/admin.py index 50742ef3..392d6a79 100644 --- a/app/general/admin.py +++ b/app/general/admin.py @@ -4,7 +4,7 @@ from django.utils.translation import gettext as _ from simple_history.admin import SimpleHistoryAdmin -from general.service.extract_text import GetTextError, GetTextFromPDF +from general.service.extract_text import GetTextError, pdf_to_text from .models import DocumentFile, Institution, Language, Project, Subject @@ -34,29 +34,26 @@ def clean(self): file_type = magic.from_buffer(uploaded_file.read(), mime=True) if file_type != "application/pdf": self.add_error("uploaded_file", _("Only PDF files are allowed.")) - - try: - # Extract text from PDF file - cleaned_data["document_data"] = GetTextFromPDF(uploaded_file).to_text() - - except GetTextError: - return self.add_error( - "uploaded_file", _("The uploaded file is corrupted or not fully downloaded.") - ) - cleaned_data["mime_type"] = file_type - uploaded_file.seek(0) # Reset file pointer after read + limit = 10 * 1024 * 1024 + if uploaded_file.size and uploaded_file.size > limit: + self.add_error("uploaded_file", _("File size must not exceed 10MB.")) + if not self.has_error("uploaded_file"): + # Don't parse if validation above failed + try: + cleaned_data["document_data"] = pdf_to_text(uploaded_file) + except GetTextError: + return self.add_error( + "uploaded_file", + _("The uploaded file is corrupted or not fully downloaded."), + ) + uploaded_file.seek(0) # Reset file pointer after read if not url and not uploaded_file: self.add_error("url", _("Either URL or uploaded file must be provided.")) self.add_error("uploaded_file", _("Either URL or uploaded file must be provided.")) - if uploaded_file: - limit = 10 * 1024 * 1024 - if uploaded_file.size and uploaded_file.size > limit: - self.add_error("uploaded_file", _("File size must not exceed 10MB.")) - return cleaned_data diff --git a/app/general/management/commands/dev_pdf_mass_upload.py b/app/general/management/commands/dev_pdf_mass_upload.py deleted file mode 100644 index 32543e5c..00000000 --- a/app/general/management/commands/dev_pdf_mass_upload.py +++ /dev/null @@ -1,109 +0,0 @@ -import os -import random -import shutil - -import magic -from django.core.files.base import ContentFile -from django.core.management.base import BaseCommand - -from general.models import DocumentFile -from general.service.extract_text import GetTextError, GetTextFromPDF - - -class Command(BaseCommand): - help = "Mass PDF uploader for testing purposes." - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.dir_main = "/pdf_uploads/" - self.dir_completed = "/pdf_upload_completed/completed/" - self.dir_error = "/pdf_upload_completed/error/" - - def handle(self, *args, **options): - print("Mass file uploader for testing purposes.") - - self.create_directory(self.dir_completed) - self.create_directory(self.dir_error) - - for root, dirs, files in os.walk(self.dir_main): - for file in files: - file_path = os.path.join(root, file) - - # Check if the file is a PDF file and save the data - self.handle_file(file_path, file) - - def handle_file(self, file_path, file): - # Get the file type - file_type = magic.from_file(file_path, mime=True) - - # Check if the file is a PDF file - directory = self.check_file_type(file_type) - self.print_pdf_file(file) - - # If file is a PDF file it saves the data and moves the file to the completed directory - if directory: - data = { - "title": file.strip(), - "file": file.strip(), - "uploaded_file": file_path, - } - # Save the data to the database and uploads the file - self.save_data(data) - - # Move the file to the completed directory - self.move_file(file_path, file, directory) - - # If the file is not a PDF file, print an error message and move the file to the error directory - else: - self.print_error() - # Move the file to the error directory - self.move_file(file_path, file, self.dir_error) - - def check_file_type(self, file_type): - return self.dir_completed if file_type == "application/pdf" else None - - def move_file(self, file_path, file, directory): - if not os.path.isfile(directory + file): - shutil.move(file_path, directory) - else: - print( - f"The file '{os.path.basename(directory + file)}' already exists in the destination directory." - ) - - def print_pdf_file(self, file): - print("\033[92m" + file + "\033[0m") - - def print_error(self): - print("\033[91m" + "Only PDF files are allowed" + "\033[0m") - - def save_data(self, data): - # Generate a random number for the institution ID - random_number = random.randint(1, 20) - - # Read the uploaded file data - with open(data["uploaded_file"], "rb") as f: - get_content_file = f.read() - - content_file = ContentFile(get_content_file, name=data["title"]) - - try: - document_data = GetTextFromPDF(data["uploaded_file"]).to_text() - - instance = DocumentFile( - title=data["title"], - document_data=document_data, # Scraps the PDF file and extracts the text - uploaded_file=content_file, - document_type="Glossary", - institution_id=random_number, - ) - instance.save() - - except GetTextError as e: - print(f"Error: {e}") - return - - def create_directory(self, directory): - try: - os.makedirs(directory, exist_ok=True) - except OSError as error: - print(f"Directory '{directory}' can not be created. Error: {error}") diff --git a/app/general/management/commands/import_documents.py b/app/general/management/commands/import_documents.py new file mode 100644 index 00000000..9f001543 --- /dev/null +++ b/app/general/management/commands/import_documents.py @@ -0,0 +1,54 @@ +# TODO: +# - Provide better command-line parameters for control, e.g. +# - import for given institution +# - associate with specific language(s)/subject(s) +# - make usable outside Docker + +import os +import random + +import magic +from django.core.files.base import ContentFile +from django.core.management.base import BaseCommand + +from general.models import DocumentFile +from general.service.extract_text import GetTextError, pdf_to_text + + +class Command(BaseCommand): + help = "Mass PDF uploader for testing purposes" + + def add_arguments(self, parser): + parser.add_argument("directory", help="Directory with files to import") + + def handle(self, *args, **options): + for root, dirs, files in os.walk(options["directory"]): + for file in files: + if not os.path.splitext(file)[1] == ".pdf": + continue + file_path = os.path.join(root, file) + self.handle_file(file_path, file) + + def handle_file(self, file_path, file_name): + print(file_name) + file_type = magic.from_file(file_path, mime=True) + if file_type == "application/pdf": + self.save_data(file_path, file_name) + else: + print("Only PDF files are allowed") + + def save_data(self, file_path, file_name): + with open(file_path, "rb") as f: + content_file = ContentFile(f.read(), name=file_name) + + try: + instance = DocumentFile( + title=file_name, + document_data=pdf_to_text(file_path), + uploaded_file=content_file, + document_type="Glossary", + institution_id=random.randint(1, 20), + ) + instance.save() + except GetTextError as e: + print(f"Error: {e}") diff --git a/app/general/service/extract_text.py b/app/general/service/extract_text.py index 613ce038..a9c65ab1 100644 --- a/app/general/service/extract_text.py +++ b/app/general/service/extract_text.py @@ -1,29 +1,30 @@ -from pypdf import PdfReader -from pypdf.errors import PdfStreamError +# TODO: +# - remove unneeded whitespace (e.g. multiple consecutive spaces) +# - remove unprintable characters, or replacing them with some symbol +# character so that excerpts look better. +# - consider removing a few too common words, like single digits "1", etc. +# or maybe anything that occurs too frequently in the full-text index that +# could cause a full-table scan. +# - consider multilingual stemming to enhance chances of success in a multi- +# lingual setup... hard! class GetTextError(Exception): pass -class GetTextFromPDF: - def __init__(self, uploaded_file): - self.uploaded_file = uploaded_file +def pdf_to_text(pdf): + # imports postponed, as they will normally not be needed frequently + from pypdf import PdfReader + from pypdf.errors import PdfStreamError - def to_text(self): - if self.uploaded_file: - text_list = [] - # Read the PDF file and extract text - try: - reader = PdfReader(self.uploaded_file) - for page in reader.pages: - text_list.append(page.extract_text()) + text_list = [] + try: + for page in PdfReader(pdf).pages: + text_list.append(page.extract_text()) + except PdfStreamError: + raise GetTextError("The uploaded PDF file is corrupted or not fully downloaded.") + except Exception: + raise GetTextError("Error during text extraction from PDF file.") - get_pdf_text = " ".join(text_list) - - return str(get_pdf_text) - - except PdfStreamError: - raise GetTextError("The uploaded PDF file is corrupted or not fully downloaded.") - except Exception: - raise GetTextError("Error during text extraction from PDF file.") + return " ".join(text_list) diff --git a/app/general/tests/test_dev_mass_upload.py b/app/general/tests/test_dev_mass_upload.py deleted file mode 100644 index 85c7bbe0..00000000 --- a/app/general/tests/test_dev_mass_upload.py +++ /dev/null @@ -1,69 +0,0 @@ -import os -import random -import unittest -from unittest.mock import MagicMock - -from faker import Faker - -from general.management.commands.dev_pdf_mass_upload import Command -from general.models import DocumentFile, Institution - - -class TestHandleFile(unittest.TestCase): - def setUp(self): - self.command = Command() - self.command.check_file_type = MagicMock() - self.command.move_file = MagicMock() - self.command.print_error = MagicMock() - self.command.print_pdf_file = MagicMock() - self.command.save_data = MagicMock() - self.test_dir = os.getenv("TESTING_DIR", "/app/general/tests/files") - self.test_file = self.test_dir + "Lorem.pdf" - self.fake = Faker() - - def test_handle_file_pdf(self): - self.command.check_file_type.return_value = self.test_dir - self.command.handle_file(self.test_file, self.test_file) - self.command.check_file_type.assert_called_once() - self.command.move_file.assert_called_once() - self.command.save_data.assert_called_once() - self.command.print_pdf_file.assert_called_once() - self.command.print_error.assert_not_called() - - def test_handle_file_non_pdf(self): - self.command.check_file_type.return_value = None - self.command.handle_file(self.test_file, self.test_file) - self.command.check_file_type.assert_called_once() - self.command.move_file.assert_called_once() - self.command.save_data.assert_not_called() - self.command.print_pdf_file.assert_called_once() - self.command.print_error.assert_called_once() - - def test_check_file_type_pdf(self): - self.assertNotEqual(self.command.check_file_type("application/pdf"), self.test_dir) - - def test_save_data(self): - self.command = Command() - # Create some Institutions instances for testing - for i in range(1, 30): - random_number = random.randint(1, 1000) - - Institution.objects.create( - id=i, - name=str(random_number) + "_" + self.fake.company(), - abbreviation=str(random_number) + "_" + self.fake.company_suffix(), - url=str(random_number) + "_" + self.fake.url(), - email=str(random_number) + "_" + self.fake.company_email(), - logo="", - ) - - data = { - "title": "Test file", - "file": "Test file", - "uploaded_file": self.test_file, - } - - self.command.save_data(data) - - document_file = DocumentFile.objects.get(title="Test file") - self.assertEqual(document_file.title, "Test file") diff --git a/app/general/tests/test_extract_text_service.py b/app/general/tests/test_extract_text_service.py index c15b3b35..fa1e7ecf 100644 --- a/app/general/tests/test_extract_text_service.py +++ b/app/general/tests/test_extract_text_service.py @@ -1,30 +1,18 @@ import os import unittest -from general.service.extract_text import GetTextFromPDF +from general.service.extract_text import pdf_to_text class TestExtractTextService(unittest.TestCase): def setUp(self): test_dir = os.getenv("TESTING_DIR", "/app/general/tests/files") - self.file_mock = test_dir + "/Lorem.pdf" - - def test_in_text(self): - with open(self.file_mock, "rb") as file: - pypdf = GetTextFromPDF(file) - - result = pypdf.to_text().strip() - - words = result.split() - - self.assertIn("turpis.", words) - - def test_not_in_text(self): - with open(self.file_mock, "rb") as file: - pypdf = GetTextFromPDF(file) - - result = pypdf.to_text().strip() - - words = result.split() - - self.assertNotIn("notintext.", words) + self.file_name = os.path.join(test_dir, "Lorem.pdf") + + def test_text_extraction(self): + with open(self.file_name, "rb") as file: + text = pdf_to_text(file) + self.assertIn("fermentum turpis.", text) + self.assertNotIn("notintext.", text) + self.assertGreater(len(text), 1470, "Too little text extracted") + self.assertGreater(len(text.split()), 220, "Too few words (spaces) extracted") diff --git a/app/general/tests/test_import_documents.py b/app/general/tests/test_import_documents.py new file mode 100644 index 00000000..8725d085 --- /dev/null +++ b/app/general/tests/test_import_documents.py @@ -0,0 +1,57 @@ +import os +import random +import unittest +from unittest.mock import MagicMock, patch + +from faker import Faker + +from general.management.commands.import_documents import Command +from general.models import DocumentFile, Institution + + +class TestHandleFile(unittest.TestCase): + def setUp(self): + self.command = Command() + self.command.save_data = MagicMock() + self.test_dir = os.getenv("TESTING_DIR", "/app/general/tests/files") + self.test_file = self.test_dir + "Lorem.pdf" + self.name = "Test file" + self.fake = Faker() + + def tearDown(self): + try: + document_file = DocumentFile.objects.get(title=self.name) + path = document_file.uploaded_file.path + if os.path.isfile(path): + os.remove(path) + except DocumentFile.DoesNotExist: + pass + + def test_handle_file_pdf(self): + self.command.handle_file(self.test_file, self.test_file) + self.command.save_data.assert_called_once() + + def test_handle_file_non_pdf(self): + with patch("magic.from_file") as from_file: + from_file.return_value = None + self.command.handle_file(self.test_file, self.test_file) + self.command.save_data.assert_not_called() + + def test_save_data(self): + command = Command() + # Create some Institutions instances for testing + for i in range(1, 21): + id = random.randint(1, 1000) + Institution.objects.create( + id=i, + name=f"{id}_{self.fake.company()}", + abbreviation=f"{id}_{self.fake.company_suffix()}", + url=f"{id}_{self.fake.url()}", + email=f"{id}_{self.fake.company_email()}", + logo="", + ) + + command.save_data(self.test_file, self.name) + document_file = DocumentFile.objects.get(title=self.name) + self.assertEqual(document_file.title, self.name) + self.assertIn("Lorem ipsum dolor", document_file.document_data)