Skip to content

Commit

Permalink
Add naive solution(#1)
Browse files Browse the repository at this point in the history
* 🚧 Naive solution
  • Loading branch information
nymann authored Aug 27, 2022
1 parent 94f96ba commit 85da738
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
python -m pip install --upgrade pip
- name: Run Lint
run: |
make tests
make lint
6 changes: 5 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ setup_requires =
pytest-runner
install_requires =
typer >= 0.4.1
block-pruner
pdf-information

[options.extras_require]
all =
Expand Down Expand Up @@ -77,7 +79,9 @@ show-source = True
strictness = long
inline-quotes = double
per-file-ignores =
tests/**.py:WPS218,WPS432,WPS442,S101,src/**/version.py:WPS410
tests/**.py:WPS218,WPS432,WPS442,S101,
src/pdf_scrub/version.py:WPS410
src/pdf_scrub/**.py:S603,S607,S108,WPS239,WPS237,WPS210,C901,WPS110,S404,WPS214

[isort]
combine_as_imports = True
Expand Down
11 changes: 9 additions & 2 deletions src/pdf_scrub/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from pathlib import Path

import typer

from pdf_scrub.pdf import PDF

app = typer.Typer()


@app.command()
def welcome(name: str) -> None:
typer.echo(f"Welcome {name}!")
def scrub(files: list[Path]) -> None:
for pdf_file in files:
pdf = PDF(pdf_file=pdf_file)
for potential in pdf.scrub():
typer.echo(potential)


if __name__ == "__main__":
Expand Down
104 changes: 104 additions & 0 deletions src/pdf_scrub/pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from pathlib import Path
import subprocess
from typing import Iterable

from block_pruner import BlockPruner
from pdf_information import PDFInfo


class PDF:
def __init__(self, pdf_file: Path) -> None:
self.pdf_file = pdf_file
self.scrubbed = pdf_file
self.info = PDFInfo.from_cmd(pdf_file=pdf_file)

def scrub(self) -> Iterable[Path]:
if self.info.encrypted:
self.scrubbed = self._decrypt()
self.scrubbed = self._uncompress()
self.scrubbed = self._remove_metadata()
for index, possibility in enumerate(self._find_possible_watermarks()):
needle: str = possibility.strip("\n")
potential = self._remove_potential_watermark(needle=needle, attempt=index)
yield self._compress(potential=potential)

def _remove_metadata(self) -> Path:
block_pruner = BlockPruner(start=r"[0-9]+\ [0-9]+\ obj", end="endobj", needle="DocumentID")
remove_metadata = block_pruner.prune_file(self.scrubbed)
out = self._file_path("no_metadata")
with open(out, "w+b") as output_file:
output_file.write(remove_metadata)
return out

def _file_path(self, name: str | int) -> Path:
return self.scrubbed.parent.joinpath(f"{self.scrubbed.name}.{name}")

def _remove_potential_watermark(self, needle: str, attempt: int) -> Path:
block_pruner = BlockPruner(start=r"[0-9]+\ [0-9]+\ obj", end="endobj", needle=needle)
out = self._file_path(attempt)
potential = block_pruner.prune_file(self.scrubbed)
with open(out, "w+b") as output_file:
output_file.write(potential)
return out

def _decrypt(self) -> Path:
out = Path(f"/tmp/{self.scrubbed.name}.decrypted")
subprocess.check_call(
[
"qpdf",
"--decrypt",
self.scrubbed,
out,
],
)
return out

def _compress(self, potential: Path) -> Path:
out = Path(f"/tmp/{potential.name}.compressed")
subprocess.check_call(
[
"pdftk",
potential,
"output",
out,
"compress",
],
)
return out

def _uncompress(self) -> Path:
out = self._file_path("uncompressed")
subprocess.check_call(
[
"pdftk",
self.scrubbed,
"output",
out,
"uncompress",
],
)
return out

def _find_possible_watermarks(self) -> Iterable[str]:
if self.info.pages is None:
raise ValueError("Cannot find watermark if pages is none")
possibilities: dict[str, int] = {}
with open(file=self.scrubbed, mode="rb") as decrypted_file:
for raw_line in decrypted_file:
line = utf8_or_space(raw_line)
if "/Length" not in line:
continue
try:
possibilities[line] += 1
except KeyError:
possibilities[line] = 1
for key, val in possibilities.items():
if val == self.info.pages:
yield key


def utf8_or_space(line: bytes) -> str:
try:
return line.decode()
except UnicodeDecodeError:
return ""
22 changes: 0 additions & 22 deletions tests/unit_tests/greet_test.py

This file was deleted.

0 comments on commit 85da738

Please sign in to comment.