-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Simpler API, improved tests, including some cleanup. Admin form only parses the PDF if it passes other validation. Mass import is called with parameter, and only reads files instead of moving around and pretty printing.
- Loading branch information
1 parent
401f92d
commit d951b01
Showing
5 changed files
with
95 additions
and
176 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,109 +1,54 @@ | ||
# TODO: | ||
# - Provide better command-line parameters for control, e.g. | ||
# - import for given institution | ||
# - associate with specific language(s)/subject(s) | ||
# - make usable outside Docker | ||
|
||
import os | ||
import random | ||
import shutil | ||
|
||
import magic | ||
from django.core.files.base import ContentFile | ||
from django.core.management.base import BaseCommand | ||
|
||
from general.models import DocumentFile | ||
from general.service.extract_text import GetTextError, GetTextFromPDF | ||
from general.service.extract_text import GetTextError, pdf_to_text | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Mass PDF uploader for testing purposes." | ||
help = "Mass PDF uploader for testing purposes" | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.dir_main = "/pdf_uploads/" | ||
self.dir_completed = "/pdf_upload_completed/completed/" | ||
self.dir_error = "/pdf_upload_completed/error/" | ||
def add_arguments(self, parser): | ||
parser.add_argument("directory", help="Directory with files to import") | ||
|
||
def handle(self, *args, **options): | ||
print("Mass file uploader for testing purposes.") | ||
|
||
self.create_directory(self.dir_completed) | ||
self.create_directory(self.dir_error) | ||
|
||
for root, dirs, files in os.walk(self.dir_main): | ||
for root, dirs, files in os.walk(options["directory"]): | ||
for file in files: | ||
if not os.path.splitext(file)[1] == ".pdf": | ||
continue | ||
file_path = os.path.join(root, file) | ||
|
||
# Check if the file is a PDF file and save the data | ||
self.handle_file(file_path, file) | ||
|
||
def handle_file(self, file_path, file): | ||
# Get the file type | ||
def handle_file(self, file_path, file_name): | ||
print(file_name) | ||
file_type = magic.from_file(file_path, mime=True) | ||
|
||
# Check if the file is a PDF file | ||
directory = self.check_file_type(file_type) | ||
self.print_pdf_file(file) | ||
|
||
# If file is a PDF file it saves the data and moves the file to the completed directory | ||
if directory: | ||
data = { | ||
"title": file.strip(), | ||
"file": file.strip(), | ||
"uploaded_file": file_path, | ||
} | ||
# Save the data to the database and uploads the file | ||
self.save_data(data) | ||
|
||
# Move the file to the completed directory | ||
self.move_file(file_path, file, directory) | ||
|
||
# If the file is not a PDF file, print an error message and move the file to the error directory | ||
else: | ||
self.print_error() | ||
# Move the file to the error directory | ||
self.move_file(file_path, file, self.dir_error) | ||
|
||
def check_file_type(self, file_type): | ||
return self.dir_completed if file_type == "application/pdf" else None | ||
|
||
def move_file(self, file_path, file, directory): | ||
if not os.path.isfile(directory + file): | ||
shutil.move(file_path, directory) | ||
if file_type == "application/pdf": | ||
self.save_data(file_path, file_name) | ||
else: | ||
print( | ||
f"The file '{os.path.basename(directory + file)}' already exists in the destination directory." | ||
) | ||
|
||
def print_pdf_file(self, file): | ||
print("\033[92m" + file + "\033[0m") | ||
|
||
def print_error(self): | ||
print("\033[91m" + "Only PDF files are allowed" + "\033[0m") | ||
|
||
def save_data(self, data): | ||
# Generate a random number for the institution ID | ||
random_number = random.randint(1, 20) | ||
print("Only PDF files are allowed") | ||
|
||
# Read the uploaded file data | ||
with open(data["uploaded_file"], "rb") as f: | ||
get_content_file = f.read() | ||
|
||
content_file = ContentFile(get_content_file, name=data["title"]) | ||
def save_data(self, file_path, file_name): | ||
with open(file_path, "rb") as f: | ||
content_file = ContentFile(f.read(), name=file_name) | ||
|
||
try: | ||
document_data = GetTextFromPDF(data["uploaded_file"]).to_text() | ||
|
||
instance = DocumentFile( | ||
title=data["title"], | ||
document_data=document_data, # Scraps the PDF file and extracts the text | ||
title=file_name, | ||
document_data=pdf_to_text(file_path), | ||
uploaded_file=content_file, | ||
document_type="Glossary", | ||
institution_id=random_number, | ||
institution_id=random.randint(1, 20), | ||
) | ||
instance.save() | ||
|
||
except GetTextError as e: | ||
print(f"Error: {e}") | ||
return | ||
|
||
def create_directory(self, directory): | ||
try: | ||
os.makedirs(directory, exist_ok=True) | ||
except OSError as error: | ||
print(f"Directory '{directory}' can not be created. Error: {error}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,30 @@ | ||
from pypdf import PdfReader | ||
from pypdf.errors import PdfStreamError | ||
# TODO: | ||
# - remove unneeded whitespace (e.g. multiple consecutive spaces) | ||
# - remove unprintable characters, or replacing them with some symbol | ||
# character so that excerpts look better. | ||
# - consider removing a few too common words, like single digits "1", etc. | ||
# or maybe anything that occurs too frequently in the full-text index that | ||
# could cause a full-table scan. | ||
# - consider multilingual stemming to enhance chances of success in a multi- | ||
# lingual setup... hard! | ||
|
||
|
||
class GetTextError(Exception): | ||
pass | ||
|
||
|
||
class GetTextFromPDF: | ||
def __init__(self, uploaded_file): | ||
self.uploaded_file = uploaded_file | ||
def pdf_to_text(pdf): | ||
# imports postponed, as they will normally not be needed frequently | ||
from pypdf import PdfReader | ||
from pypdf.errors import PdfStreamError | ||
|
||
def to_text(self): | ||
if self.uploaded_file: | ||
text_list = [] | ||
# Read the PDF file and extract text | ||
try: | ||
reader = PdfReader(self.uploaded_file) | ||
for page in reader.pages: | ||
text_list.append(page.extract_text()) | ||
text_list = [] | ||
try: | ||
for page in PdfReader(pdf).pages: | ||
text_list.append(page.extract_text()) | ||
except PdfStreamError: | ||
raise GetTextError("The uploaded PDF file is corrupted or not fully downloaded.") | ||
except Exception: | ||
raise GetTextError("Error during text extraction from PDF file.") | ||
|
||
get_pdf_text = " ".join(text_list) | ||
|
||
return str(get_pdf_text) | ||
|
||
except PdfStreamError: | ||
raise GetTextError("The uploaded PDF file is corrupted or not fully downloaded.") | ||
except Exception: | ||
raise GetTextError("Error during text extraction from PDF file.") | ||
return " ".join(text_list) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,18 @@ | ||
import os | ||
import unittest | ||
|
||
from general.service.extract_text import GetTextFromPDF | ||
from general.service.extract_text import pdf_to_text | ||
|
||
|
||
class TestExtractTextService(unittest.TestCase): | ||
def setUp(self): | ||
test_dir = os.getenv("TESTING_DIR", "/app/general/tests/files") | ||
self.file_mock = test_dir + "/Lorem.pdf" | ||
|
||
def test_in_text(self): | ||
with open(self.file_mock, "rb") as file: | ||
pypdf = GetTextFromPDF(file) | ||
|
||
result = pypdf.to_text().strip() | ||
|
||
words = result.split() | ||
|
||
self.assertIn("turpis.", words) | ||
|
||
def test_not_in_text(self): | ||
with open(self.file_mock, "rb") as file: | ||
pypdf = GetTextFromPDF(file) | ||
|
||
result = pypdf.to_text().strip() | ||
|
||
words = result.split() | ||
|
||
self.assertNotIn("notintext.", words) | ||
self.file_name = os.path.join(test_dir, "Lorem.pdf") | ||
|
||
def test_text_extraction(self): | ||
with open(self.file_name, "rb") as file: | ||
text = pdf_to_text(file) | ||
self.assertIn("fermentum turpis.", text) | ||
self.assertNotIn("notintext.", text) | ||
self.assertGreater(len(text), 1470, "Too little text extracted") | ||
self.assertGreater(len(text.split()), 220, "Too few words (spaces) extracted") |