diff --git a/app/general/admin.py b/app/general/admin.py index 50742ef3..392d6a79 100644 --- a/app/general/admin.py +++ b/app/general/admin.py @@ -4,7 +4,7 @@ from django.utils.translation import gettext as _ from simple_history.admin import SimpleHistoryAdmin -from general.service.extract_text import GetTextError, GetTextFromPDF +from general.service.extract_text import GetTextError, pdf_to_text from .models import DocumentFile, Institution, Language, Project, Subject @@ -34,29 +34,26 @@ def clean(self): file_type = magic.from_buffer(uploaded_file.read(), mime=True) if file_type != "application/pdf": self.add_error("uploaded_file", _("Only PDF files are allowed.")) - - try: - # Extract text from PDF file - cleaned_data["document_data"] = GetTextFromPDF(uploaded_file).to_text() - - except GetTextError: - return self.add_error( - "uploaded_file", _("The uploaded file is corrupted or not fully downloaded.") - ) - cleaned_data["mime_type"] = file_type - uploaded_file.seek(0) # Reset file pointer after read + limit = 10 * 1024 * 1024 + if uploaded_file.size and uploaded_file.size > limit: + self.add_error("uploaded_file", _("File size must not exceed 10MB.")) + if not self.has_error("uploaded_file"): + # Don't parse if validation above failed + try: + cleaned_data["document_data"] = pdf_to_text(uploaded_file) + except GetTextError: + return self.add_error( + "uploaded_file", + _("The uploaded file is corrupted or not fully downloaded."), + ) + uploaded_file.seek(0) # Reset file pointer after read if not url and not uploaded_file: self.add_error("url", _("Either URL or uploaded file must be provided.")) self.add_error("uploaded_file", _("Either URL or uploaded file must be provided.")) - if uploaded_file: - limit = 10 * 1024 * 1024 - if uploaded_file.size and uploaded_file.size > limit: - self.add_error("uploaded_file", _("File size must not exceed 10MB.")) - return cleaned_data diff --git a/app/general/management/commands/dev_pdf_mass_upload.py b/app/general/management/commands/dev_pdf_mass_upload.py index 32543e5c..9f001543 100644 --- a/app/general/management/commands/dev_pdf_mass_upload.py +++ b/app/general/management/commands/dev_pdf_mass_upload.py @@ -1,109 +1,54 @@ +# TODO: +# - Provide better command-line parameters for control, e.g. +# - import for given institution +# - associate with specific language(s)/subject(s) +# - make usable outside Docker + import os import random -import shutil import magic from django.core.files.base import ContentFile from django.core.management.base import BaseCommand from general.models import DocumentFile -from general.service.extract_text import GetTextError, GetTextFromPDF +from general.service.extract_text import GetTextError, pdf_to_text class Command(BaseCommand): - help = "Mass PDF uploader for testing purposes." + help = "Mass PDF uploader for testing purposes" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.dir_main = "/pdf_uploads/" - self.dir_completed = "/pdf_upload_completed/completed/" - self.dir_error = "/pdf_upload_completed/error/" + def add_arguments(self, parser): + parser.add_argument("directory", help="Directory with files to import") def handle(self, *args, **options): - print("Mass file uploader for testing purposes.") - - self.create_directory(self.dir_completed) - self.create_directory(self.dir_error) - - for root, dirs, files in os.walk(self.dir_main): + for root, dirs, files in os.walk(options["directory"]): for file in files: + if not os.path.splitext(file)[1] == ".pdf": + continue file_path = os.path.join(root, file) - - # Check if the file is a PDF file and save the data self.handle_file(file_path, file) - def handle_file(self, file_path, file): - # Get the file type + def handle_file(self, file_path, file_name): + print(file_name) file_type = magic.from_file(file_path, mime=True) - - # Check if the file is a PDF file - directory = self.check_file_type(file_type) - self.print_pdf_file(file) - - # If file is a PDF file it saves the data and moves the file to the completed directory - if directory: - data = { - "title": file.strip(), - "file": file.strip(), - "uploaded_file": file_path, - } - # Save the data to the database and uploads the file - self.save_data(data) - - # Move the file to the completed directory - self.move_file(file_path, file, directory) - - # If the file is not a PDF file, print an error message and move the file to the error directory - else: - self.print_error() - # Move the file to the error directory - self.move_file(file_path, file, self.dir_error) - - def check_file_type(self, file_type): - return self.dir_completed if file_type == "application/pdf" else None - - def move_file(self, file_path, file, directory): - if not os.path.isfile(directory + file): - shutil.move(file_path, directory) + if file_type == "application/pdf": + self.save_data(file_path, file_name) else: - print( - f"The file '{os.path.basename(directory + file)}' already exists in the destination directory." - ) - - def print_pdf_file(self, file): - print("\033[92m" + file + "\033[0m") - - def print_error(self): - print("\033[91m" + "Only PDF files are allowed" + "\033[0m") - - def save_data(self, data): - # Generate a random number for the institution ID - random_number = random.randint(1, 20) + print("Only PDF files are allowed") - # Read the uploaded file data - with open(data["uploaded_file"], "rb") as f: - get_content_file = f.read() - - content_file = ContentFile(get_content_file, name=data["title"]) + def save_data(self, file_path, file_name): + with open(file_path, "rb") as f: + content_file = ContentFile(f.read(), name=file_name) try: - document_data = GetTextFromPDF(data["uploaded_file"]).to_text() - instance = DocumentFile( - title=data["title"], - document_data=document_data, # Scraps the PDF file and extracts the text + title=file_name, + document_data=pdf_to_text(file_path), uploaded_file=content_file, document_type="Glossary", - institution_id=random_number, + institution_id=random.randint(1, 20), ) instance.save() - except GetTextError as e: print(f"Error: {e}") - return - - def create_directory(self, directory): - try: - os.makedirs(directory, exist_ok=True) - except OSError as error: - print(f"Directory '{directory}' can not be created. Error: {error}") diff --git a/app/general/service/extract_text.py b/app/general/service/extract_text.py index 613ce038..a9c65ab1 100644 --- a/app/general/service/extract_text.py +++ b/app/general/service/extract_text.py @@ -1,29 +1,30 @@ -from pypdf import PdfReader -from pypdf.errors import PdfStreamError +# TODO: +# - remove unneeded whitespace (e.g. multiple consecutive spaces) +# - remove unprintable characters, or replacing them with some symbol +# character so that excerpts look better. +# - consider removing a few too common words, like single digits "1", etc. +# or maybe anything that occurs too frequently in the full-text index that +# could cause a full-table scan. +# - consider multilingual stemming to enhance chances of success in a multi- +# lingual setup... hard! class GetTextError(Exception): pass -class GetTextFromPDF: - def __init__(self, uploaded_file): - self.uploaded_file = uploaded_file +def pdf_to_text(pdf): + # imports postponed, as they will normally not be needed frequently + from pypdf import PdfReader + from pypdf.errors import PdfStreamError - def to_text(self): - if self.uploaded_file: - text_list = [] - # Read the PDF file and extract text - try: - reader = PdfReader(self.uploaded_file) - for page in reader.pages: - text_list.append(page.extract_text()) + text_list = [] + try: + for page in PdfReader(pdf).pages: + text_list.append(page.extract_text()) + except PdfStreamError: + raise GetTextError("The uploaded PDF file is corrupted or not fully downloaded.") + except Exception: + raise GetTextError("Error during text extraction from PDF file.") - get_pdf_text = " ".join(text_list) - - return str(get_pdf_text) - - except PdfStreamError: - raise GetTextError("The uploaded PDF file is corrupted or not fully downloaded.") - except Exception: - raise GetTextError("Error during text extraction from PDF file.") + return " ".join(text_list) diff --git a/app/general/tests/test_dev_mass_upload.py b/app/general/tests/test_dev_mass_upload.py index 85c7bbe0..09a35038 100644 --- a/app/general/tests/test_dev_mass_upload.py +++ b/app/general/tests/test_dev_mass_upload.py @@ -1,7 +1,7 @@ import os import random import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from faker import Faker @@ -12,58 +12,46 @@ class TestHandleFile(unittest.TestCase): def setUp(self): self.command = Command() - self.command.check_file_type = MagicMock() - self.command.move_file = MagicMock() - self.command.print_error = MagicMock() - self.command.print_pdf_file = MagicMock() self.command.save_data = MagicMock() self.test_dir = os.getenv("TESTING_DIR", "/app/general/tests/files") self.test_file = self.test_dir + "Lorem.pdf" + self.name = "Test file" self.fake = Faker() + def tearDown(self): + try: + document_file = DocumentFile.objects.get(title=self.name) + path = document_file.uploaded_file.path + if os.path.isfile(path): + os.remove(path) + except DocumentFile.DoesNotExist: + pass + def test_handle_file_pdf(self): - self.command.check_file_type.return_value = self.test_dir self.command.handle_file(self.test_file, self.test_file) - self.command.check_file_type.assert_called_once() - self.command.move_file.assert_called_once() self.command.save_data.assert_called_once() - self.command.print_pdf_file.assert_called_once() - self.command.print_error.assert_not_called() def test_handle_file_non_pdf(self): - self.command.check_file_type.return_value = None - self.command.handle_file(self.test_file, self.test_file) - self.command.check_file_type.assert_called_once() - self.command.move_file.assert_called_once() + with patch("magic.from_file") as from_file: + from_file.return_value = None + self.command.handle_file(self.test_file, self.test_file) self.command.save_data.assert_not_called() - self.command.print_pdf_file.assert_called_once() - self.command.print_error.assert_called_once() - - def test_check_file_type_pdf(self): - self.assertNotEqual(self.command.check_file_type("application/pdf"), self.test_dir) def test_save_data(self): - self.command = Command() + command = Command() # Create some Institutions instances for testing - for i in range(1, 30): - random_number = random.randint(1, 1000) - + for i in range(1, 21): + id = random.randint(1, 1000) Institution.objects.create( id=i, - name=str(random_number) + "_" + self.fake.company(), - abbreviation=str(random_number) + "_" + self.fake.company_suffix(), - url=str(random_number) + "_" + self.fake.url(), - email=str(random_number) + "_" + self.fake.company_email(), + name=f"{id}_{self.fake.company()}", + abbreviation=f"{id}_{self.fake.company_suffix()}", + url=f"{id}_{self.fake.url()}", + email=f"{id}_{self.fake.company_email()}", logo="", ) - data = { - "title": "Test file", - "file": "Test file", - "uploaded_file": self.test_file, - } - - self.command.save_data(data) - - document_file = DocumentFile.objects.get(title="Test file") - self.assertEqual(document_file.title, "Test file") + command.save_data(self.test_file, self.name) + document_file = DocumentFile.objects.get(title=self.name) + self.assertEqual(document_file.title, self.name) + self.assertIn("Lorem ipsum dolor", document_file.document_data) diff --git a/app/general/tests/test_extract_text_service.py b/app/general/tests/test_extract_text_service.py index c15b3b35..fa1e7ecf 100644 --- a/app/general/tests/test_extract_text_service.py +++ b/app/general/tests/test_extract_text_service.py @@ -1,30 +1,18 @@ import os import unittest -from general.service.extract_text import GetTextFromPDF +from general.service.extract_text import pdf_to_text class TestExtractTextService(unittest.TestCase): def setUp(self): test_dir = os.getenv("TESTING_DIR", "/app/general/tests/files") - self.file_mock = test_dir + "/Lorem.pdf" - - def test_in_text(self): - with open(self.file_mock, "rb") as file: - pypdf = GetTextFromPDF(file) - - result = pypdf.to_text().strip() - - words = result.split() - - self.assertIn("turpis.", words) - - def test_not_in_text(self): - with open(self.file_mock, "rb") as file: - pypdf = GetTextFromPDF(file) - - result = pypdf.to_text().strip() - - words = result.split() - - self.assertNotIn("notintext.", words) + self.file_name = os.path.join(test_dir, "Lorem.pdf") + + def test_text_extraction(self): + with open(self.file_name, "rb") as file: + text = pdf_to_text(file) + self.assertIn("fermentum turpis.", text) + self.assertNotIn("notintext.", text) + self.assertGreater(len(text), 1470, "Too little text extracted") + self.assertGreater(len(text.split()), 220, "Too few words (spaces) extracted")