-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
109 additions
and
101 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,3 +23,4 @@ coverage.xml | |
htmlcov | ||
.idea | ||
.cache/ | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,104 +1,34 @@ | ||
from pathlib import Path | ||
import subprocess | ||
from typing import Iterable | ||
|
||
from block_pruner import BlockPruner | ||
from pdf_information import PDFInfo | ||
|
||
from pdf_scrub.toolkit import compress_pdf | ||
from pdf_scrub.toolkit import decrypt_pdf | ||
from pdf_scrub.toolkit import find_possible_watermark_needles | ||
from pdf_scrub.toolkit import remove_metadata | ||
from pdf_scrub.toolkit import remove_watermark | ||
from pdf_scrub.toolkit import uncompress_pdf | ||
|
||
|
||
class PDF: | ||
def __init__(self, pdf_file: Path) -> None: | ||
self.pdf_file = pdf_file | ||
self.scrubbed = pdf_file | ||
self.info = PDFInfo.from_cmd(pdf_file=pdf_file) | ||
|
||
def scrub(self) -> Iterable[Path]: | ||
if self.info.encrypted: | ||
self.scrubbed = self._decrypt() | ||
self.scrubbed = self._uncompress() | ||
self.scrubbed = self._remove_metadata() | ||
for index, possibility in enumerate(self._find_possible_watermarks()): | ||
needle: str = possibility.strip("\n") | ||
potential = self._remove_potential_watermark(needle=needle, attempt=index) | ||
yield self._compress(potential=potential) | ||
|
||
def _remove_metadata(self) -> Path: | ||
block_pruner = BlockPruner(start=r"[0-9]+\ [0-9]+\ obj", end="endobj", needle="DocumentID") | ||
remove_metadata = block_pruner.prune_file(self.scrubbed) | ||
out = self._file_path("no_metadata") | ||
with open(out, "w+b") as output_file: | ||
output_file.write(remove_metadata) | ||
return out | ||
|
||
def _file_path(self, name: str | int) -> Path: | ||
return self.scrubbed.parent.joinpath(f"{self.scrubbed.name}.{name}") | ||
|
||
def _remove_potential_watermark(self, needle: str, attempt: int) -> Path: | ||
block_pruner = BlockPruner(start=r"[0-9]+\ [0-9]+\ obj", end="endobj", needle=needle) | ||
out = self._file_path(attempt) | ||
potential = block_pruner.prune_file(self.scrubbed) | ||
with open(out, "w+b") as output_file: | ||
output_file.write(potential) | ||
return out | ||
|
||
def _decrypt(self) -> Path: | ||
out = Path(f"/tmp/{self.scrubbed.name}.decrypted") | ||
subprocess.check_call( | ||
[ | ||
"qpdf", | ||
"--decrypt", | ||
self.scrubbed, | ||
out, | ||
], | ||
) | ||
return out | ||
|
||
def _compress(self, potential: Path) -> Path: | ||
out = Path(f"/tmp/{potential.name}.compressed") | ||
subprocess.check_call( | ||
[ | ||
"pdftk", | ||
potential, | ||
"output", | ||
out, | ||
"compress", | ||
], | ||
) | ||
return out | ||
|
||
def _uncompress(self) -> Path: | ||
out = self._file_path("uncompressed") | ||
subprocess.check_call( | ||
[ | ||
"pdftk", | ||
self.scrubbed, | ||
"output", | ||
out, | ||
"uncompress", | ||
], | ||
) | ||
return out | ||
|
||
def _find_possible_watermarks(self) -> Iterable[str]: | ||
if self.info.pages is None: | ||
raise ValueError("Cannot find watermark if pages is none") | ||
possibilities: dict[str, int] = {} | ||
with open(file=self.scrubbed, mode="rb") as decrypted_file: | ||
for raw_line in decrypted_file: | ||
line = utf8_or_space(raw_line) | ||
if "/Length" not in line: | ||
continue | ||
try: | ||
possibilities[line] += 1 | ||
except KeyError: | ||
possibilities[line] = 1 | ||
for key, val in possibilities.items(): | ||
if val == self.info.pages: | ||
yield key | ||
|
||
|
||
def utf8_or_space(line: bytes) -> str: | ||
try: | ||
return line.decode() | ||
except UnicodeDecodeError: | ||
return "" | ||
self.pdf_info = PDFInfo.from_cmd(pdf_file=pdf_file) | ||
|
||
def scrub(self, compress: bool) -> Iterable[bytes]: | ||
pdf_bytes_without_metadata: bytes = remove_metadata(pdf_bytes=self._get_uncompressed()) | ||
for needle in find_possible_watermark_needles(pdf_bytes=pdf_bytes_without_metadata, pdf_info=self.pdf_info): | ||
potentially_clean_pdf = remove_watermark(pdf_bytes=pdf_bytes_without_metadata, needle=needle) | ||
if compress: | ||
yield compress_pdf(pdf_bytes=potentially_clean_pdf) | ||
else: | ||
yield potentially_clean_pdf | ||
|
||
def _get_uncompressed(self) -> bytes: | ||
if self.pdf_info.encrypted: | ||
decrypted: bytes = decrypt_pdf(pdf_file=self.pdf_file) | ||
else: | ||
with open(file=self.pdf_file, mode="rb") as pdf_buffer: | ||
decrypted = pdf_buffer.read() | ||
return uncompress_pdf(pdf_file_buffer=decrypted) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from pathlib import Path | ||
import subprocess # noqa: S404 | ||
from typing import Iterable | ||
|
||
from block_pruner import BlockPruner | ||
from pdf_information import PDFInfo | ||
|
||
|
||
def _pdftk(stdin: bytes, command: str) -> bytes: | ||
return subprocess.check_output( # noqa: S603 | ||
["/usr/bin/pdftk", "-", "output", "-", command], | ||
input=stdin, | ||
) | ||
|
||
|
||
def uncompress_pdf(pdf_file_buffer: bytes) -> bytes: | ||
return _pdftk(stdin=pdf_file_buffer, command="uncompress") | ||
|
||
|
||
def decrypt_pdf(pdf_file: Path) -> bytes: | ||
return subprocess.check_output(["/usr/bin/qpdf", "--decrypt", pdf_file, "-"]) # noqa: S603 | ||
|
||
|
||
def compress_pdf(pdf_bytes: bytes) -> bytes: | ||
return _pdftk(stdin=pdf_bytes, command="compress") | ||
|
||
|
||
def remove_metadata(pdf_bytes: bytes) -> bytes: | ||
return BlockPruner( | ||
start=r"<\?xpacket\ begin", | ||
end=r"<\?xpacket\ end", | ||
needle="DocumentID", | ||
).prune_bytes(input_data=pdf_bytes) | ||
|
||
|
||
def remove_watermark(pdf_bytes: bytes, needle: str) -> bytes: | ||
return BlockPruner( | ||
start=r"[0-9]+\ [0-9]+\ obj", | ||
end="endobj", | ||
needle=needle, | ||
).prune_bytes(input_data=pdf_bytes) | ||
|
||
|
||
def find_possible_watermark_needles(pdf_bytes: bytes, pdf_info: PDFInfo) -> Iterable[str]: | ||
if pdf_info.pages is None: | ||
raise ValueError("Cannot find watermark if pages is none") | ||
possibilities: dict[str, int] = {} | ||
for raw_line in pdf_bytes.splitlines(keepends=False): | ||
line = utf8_or_space(raw_line) | ||
if "/Length" not in line: | ||
continue | ||
try: | ||
possibilities[line] += 1 | ||
except KeyError: | ||
possibilities[line] = 1 | ||
yield from (needle for needle, count in possibilities.items() if count == pdf_info.pages) | ||
|
||
|
||
def utf8_or_space(line: bytes) -> str: | ||
try: | ||
return line.decode() | ||
except UnicodeDecodeError: | ||
return "" |