Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pacer): add command to fetch docs filtered by page count from PACER #4901

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
14c48e8
feat(pacer): add command to fetch docs filtered by page count from PACER
elisa-a-v Jan 4, 2025
b85778a
test(pacer): introduce tests for command to fetch docs from PACER
elisa-a-v Jan 7, 2025
9dca383
refactor(recap): abstract PACER doc fetch chain build from do_pacer_f…
elisa-a-v Jan 7, 2025
6e4b14b
refactor(pacer): bulk fetch command now uses CeleryThrottle
elisa-a-v Jan 8, 2025
88df431
test(pacer): update tests for new implementation
elisa-a-v Jan 8, 2025
2915cb9
fix(pacer): enable rate limiting for task to fetch doc from PACER
elisa-a-v Jan 9, 2025
e7e7d14
test(pacer): test rate limiting
elisa-a-v Jan 9, 2025
00eabb5
test(pacer): enhance test by adding subTest for round-robin asserts
elisa-a-v Jan 9, 2025
bd3ce86
Merge branch 'main' into 4839-known-big-docs-retrieval
elisa-a-v Jan 9, 2025
3f01cba
refactor(pacer): divide process in two stages: fetch and process
elisa-a-v Jan 22, 2025
8606f18
feat(pacer_bulk_fetch): add method to update fetches in progress
elisa-a-v Jan 28, 2025
2b750da
refactor(pacer_bulk_fetch): use instance attr to track fetches in pro…
elisa-a-v Jan 28, 2025
221f6fa
refactor(pacer_bulk_fetch): add helper func that appends a given valu…
elisa-a-v Jan 28, 2025
ade0846
feat(pacer_bulk_fetch): add retry mechanism for possibly stuck FQs
elisa-a-v Jan 28, 2025
20f1d85
refactor(pacer_bulk_fetch): remove duplicated code
elisa-a-v Jan 28, 2025
261da6e
refactor(pacer_bulk_fetch): keep track of timed out doc ids to avoid …
elisa-a-v Jan 28, 2025
5234012
feat(pacer_bulk_fetch): check RDs before attempting to process them
elisa-a-v Jan 28, 2025
66038a3
feat(pacer_bulk_fetch): avoid attempting to fetch previously fetched RDs
elisa-a-v Jan 28, 2025
1b7299b
feat(pacer_bulk_fetch): introduce arg to determine stage to run
elisa-a-v Jan 28, 2025
9c238ff
Merge branch 'main' into 4839-known-big-docs-retrieval
elisa-a-v Jan 28, 2025
db07b63
test(pacer_bulk_fetch): update tests for two-stage implementation
elisa-a-v Jan 29, 2025
b9d2cc3
fix(pacer_bulk_fetch): set default in cache.get
elisa-a-v Jan 29, 2025
818b5ca
fix(pacer_bulk_fetch): get proper fields from recap_document
elisa-a-v Jan 29, 2025
f03c091
refactor(pacer_bulk_fetch): replace args w/methods to get cache key f…
elisa-a-v Jan 31, 2025
bf45fbb
refactor(pacer_bulk_fetch): simplify courts query
elisa-a-v Jan 31, 2025
326df59
refactor(tests): remove useless variable
elisa-a-v Jan 31, 2025
f83126f
test(pacer_bulk_fetch): instead of patching cache methods, patch keys…
elisa-a-v Jan 31, 2025
624c616
refactor(pacer_bulk_fetch): use new methods to improve testability
elisa-a-v Jan 31, 2025
5c19ef8
fix(pacer_bulk_fetch): actually call method to get cache key
elisa-a-v Jan 31, 2025
a96d9fe
test(pacer_bulk_fetch): add unit tests for critical methods
elisa-a-v Jan 31, 2025
84a181b
test(pacer_bulk_fetch): clean up cache after unit tests
elisa-a-v Jan 31, 2025
e85d996
refactor(tasks): revert unnecessary changes
elisa-a-v Jan 31, 2025
ec0e8cf
Merge branch 'main' into 4839-known-big-docs-retrieval
elisa-a-v Jan 31, 2025
32e62e8
fix(recap): Set command defaults and streamline the code
albertisfu Feb 13, 2025
7af8b38
fix(recap): Removed unnecessary code and added a logger to handle_pro…
albertisfu Feb 13, 2025
b375b1c
fix(recap): Streamlined tests and pacer_bulk_fetch code blocks
albertisfu Feb 13, 2025
467c18a
fix(recap): Fixed mark FQs as completed
albertisfu Feb 14, 2025
02f8300
Merge branch 'main' into 4839-known-big-docs-retrieval
albertisfu Feb 14, 2025
5e7f6da
fix(recap): Refactor to properly handle failures when fetching documents
albertisfu Feb 14, 2025
42b63e1
fix(recap): Simplified code and added type hints
albertisfu Feb 14, 2025
6460057
fix(recap): Exclude RDs that belong to subdockets to ensure they are …
albertisfu Feb 14, 2025
be1392b
fix(recap): Resolved test cache collisions caused by identical key names
albertisfu Feb 14, 2025
40fbf0b
fix(recap): Refactor logger.info in pacer_bulk_fetch
albertisfu Feb 15, 2025
fe62d27
fix(recap): Introduced long_wait to wait for FQs to be completed afte…
albertisfu Feb 15, 2025
cfd9363
fix(recap): Refactor the waiting strategy for pacer_bulk_fetch betwee…
albertisfu Feb 17, 2025
02222b4
fix(recap): Fixed pacer_bulk_fetch type hints
albertisfu Feb 17, 2025
9c6c5d5
fix(recap): Added initial-backoff-time as a command argument
albertisfu Feb 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 385 additions & 0 deletions cl/search/management/commands/pacer_bulk_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,385 @@
import logging
import time
from datetime import datetime, timedelta

from celery import chain
from django.contrib.auth.models import User
from django.core.cache import cache
from django.core.management.base import CommandError
from django.db.models import Q
from django.utils import timezone

from cl import settings
from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import VerboseCommand
from cl.lib.pacer_session import get_or_cache_pacer_cookies
from cl.recap.models import PROCESSING_STATUS, REQUEST_TYPE, PacerFetchQueue
from cl.recap.tasks import fetch_pacer_doc_by_rd, mark_fq_successful
from cl.scrapers.tasks import extract_recap_pdf
from cl.search.models import Court, RECAPDocument

logger = logging.getLogger(__name__)
albertisfu marked this conversation as resolved.
Show resolved Hide resolved


def append_value_in_cache(key, value):
cached_docs = cache.get(key)
if cached_docs is None:
cached_docs = []
cached_docs.append(value)
one_month = 60 * 60 * 24 * 7 * 4
cache.set(key, cached_docs, timeout=one_month)


class Command(VerboseCommand):
help = "Download multiple documents from PACER with rate limiting"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.options = None
self.user = None
self.recap_documents = None
self.courts_with_docs = {}
self.total_launched = 0
self.total_errors = 0
self.max_retries = 5
self.pacer_username = None
self.pacer_password = None
self.throttle = None
self.queue_name = None
self.interval = None
self.fetches_in_progress = {} # {court_id: (fq_pk, retry_count)}

def add_arguments(self, parser) -> None:
parser.add_argument(
"--interval",
type=float,
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
help="The minimum wait in secs between PACER fetches to the same court.",
)
parser.add_argument(
"--min-page-count",
type=int,
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
help="Get docs with this number of pages or more",
)
parser.add_argument(
"--max-page-count",
type=int,
help="Get docs with this number of pages or less",
)
parser.add_argument(
"--username",
type=str,
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
help="Username to associate with the processing queues (defaults to 'recap')",
)
parser.add_argument(
"--queue-name",
type=str,
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
help="Celery queue name used for processing tasks",
)
parser.add_argument(
"--testing",
type=str,
help="Prevents creation of log file",
)
parser.add_argument(
"--stage",
type=str,
choices=["fetch", "process"],
default="fetch",
help="Stage of the command to run: fetch or process",
)

@staticmethod
def docs_to_process_cache_key():
"""Helper method to improve testability."""
return "pacer_bulk_fetch.docs_to_process"

@staticmethod
def timed_out_docs_cache_key():
"""Helper method to improve testability."""
return "pacer_bulk_fetch.timed_out_docs"

@staticmethod
def setup_logging(testing: bool = False) -> None:
if not testing:
logging.basicConfig(
filename=f'pacer_bulk_fetch_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log',
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for collecting logs into a file for this command? We usually monitor logs via Loki in Grafana, so this might not be necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method. Since we don't require to store logs in a file.


def setup_celery(self) -> None:
"""Setup Celery by setting the queue_name and throttle."""
self.queue_name = self.options.get("queue_name", "pacer_bulk_fetch")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers won't do tasks in queues they're not configured for, so if you run this code and we don't have a queue named pacer_bulk_fetch, you'll create tasks that never get processed. We have a handful of queues set up that you can use called batch0 to batch5.

But that aside, what you're doing here is setting a default value for a command arg, which should be done up in argparse instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved, now default's to batch0

self.throttle = CeleryThrottle(queue_name=self.queue_name)

def handle_pacer_session(self) -> None:
"""Make sure we have an active PACER session for the user."""
self.pacer_username = self.options.get(
"pacer_username", settings.PACER_USERNAME
)
self.pacer_password = self.options.get(
"pacer_password", settings.PACER_PASSWORD
)
get_or_cache_pacer_cookies(
self.user.pk,
username=self.pacer_username,
password=self.pacer_password,
)

def set_user(self, username: str) -> None:
"""Get user or raise CommandError"""
if not username:
raise CommandError(
"No username provided, cannot create PacerFetchQueues."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a default, then we shouldn't need this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree, this code has been removed

try:
self.user = User.objects.get(username=username)
except User.DoesNotExist:
raise CommandError(f"User {username} does not exist")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just let it crash at this point, honestly, which makes this entire method unneeded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, method removed


def identify_documents(self) -> None:
"""Get eligible documents grouped by court"""
filters = [
Q(pacer_doc_id__isnull=False),
Q(is_available=False),
]
if self.options.get("min_page_count"):
filters.append(Q(page_count__gte=self.options["min_page_count"]))
if self.options.get("max_page_count"):
filters.append(Q(page_count__lte=self.options["max_page_count"]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use argparse to make min_page_count a required parameter and to default max_page_count to 10_000, you can get rid of Q altogether and just build the query the same way every time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was tempted to ask for max_page_count to be removed under YAGNI, but I think we will need it when we do shorter docs next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_page_count is now a required parameter but only for --stage fetch since this is not required for the extracting stage.

--max-page-count default's to 10_000


# Do not attempt to fetch docs that were already fetched:
cached_fetches = cache.get(self.docs_to_process_cache_key(), [])
previously_fetched = [rd_pk for (rd_pk, _) in cached_fetches]
# Only try again with those that were timed out before:
cached_timed_out = cache.get(self.timed_out_docs_cache_key(), [])
previously_timed_out = [rd_pk for (rd_pk, _) in cached_timed_out]
redundant = set(previously_fetched) - set(previously_timed_out)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ids_to_skip is more clear?

Suggested change
redundant = set(previously_fetched) - set(previously_timed_out)
ids_to_skip = set(previously_fetched) - set(previously_timed_out)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

self.recap_documents = (
RECAPDocument.objects.filter(*filters)
.exclude(pk__in=redundant)
.select_related("docket_entry__docket")
.values(
"id",
"page_count",
"docket_entry__docket__court_id",
"pacer_doc_id",
)
.order_by("-page_count")
)

courts = (
Court.objects.filter(
id__in=[
recap_doc_id["docket_entry__docket__court_id"]
for recap_doc_id in self.recap_documents
]
)
.order_by("pk")
.distinct()
)

for court in courts:
self.courts_with_docs[court.pk] = [
doc
for doc in self.recap_documents
if doc["docket_entry__docket__court_id"] == court.pk
]

def enqueue_pacer_fetch(self, doc: dict) -> PacerFetchQueue:
"""Actually apply the task to fetch the doc from PACER.

The ids for the fetched RD and their corresponding FQ are stored
in cache so we know which ones to process in a later stage of
this command.
"""
self.throttle.maybe_wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't completely thought this through, but it feels like you've got this double-throttled. If you remove this throttle completely, would it work just fine? This throttle is usually used to prevent the queue from growing out of control, but that shouldn't happen anyway, since we're only doing one court at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine to keep. Even though we're processing one court at a time, if documents belong to many courts—let's say around 200 it will still be possible to schedule 200 tasks at once. Using throttle.maybe_wait() will limit this to a maximum of 100 tasks enqueued.

rd_pk = doc.get("id")
fq = PacerFetchQueue.objects.create(
request_type=REQUEST_TYPE.PDF,
recap_document_id=rd_pk,
user_id=self.user.pk,
)
fetch_pacer_doc_by_rd.si(rd_pk, fq.pk).apply_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an issue related to this line, but it also affects another line. I'll describe the issue here and then provide a related comment below.

The main problem is that PacerFetchQueue is not being marked as successfully completed during the retrieval stage. This process is handled by mark_fq_successful, which is currently called only after the document extraction is finished in the second stage of the command.

This can cause the command to get stuck while waiting for the FQs to be marked as completed. Specifically, in this line:

if skipped_courts == courts_at_start:
                time.sleep(self.interval)

Since all FQs remain in progress, the command keeps waiting unnecessarily.

To prevent this, the FQ should be marked as completed immediately after retrieval in fetch_pacer_doc_by_rd.

I tested this approach:

chain(
          fetch_pacer_doc_by_rd.si(rd_pk, fq.pk).set(queue=self.queue_name),
          mark_fq_successful.si(fq.pk).set(queue=self.queue_name)
).apply_async()

This partially resolves the issue. However, marking the FQ as successful can still be delayed because both fetch_pacer_doc_by_rd and mark_fq_successful are processed by workers in the queue. Depending on worker availability, mark_fq_successful may take a few seconds to execute, leading to unnecessary retries or even exceeding the retry count, causing FQs to be incorrectly marked as timed out.

To fully resolve this, mark_fq_successful should be executed as part of the fetch_pacer_doc_by_rd task.

Currently, the mark_fq_successful task also sends webhooks for the Fetch API, so it makes sense to execute it as a secondary task. However, since webhooks are not required for this retrieval, it seems reasonable to update the FQ date_completed, status, and message directly within fetch_pacer_doc_by_rd.

We just need to ensure that this logic is not duplicated for regular FQ processing, which will continue relying on mark_fq_successful. One approach could be to introduce a new argument in fetch_pacer_doc_by_rd to indicate whether the successful FQ status should be updated within the task. Alternatively, we could move this logic entirely into fetch_pacer_doc_by_rd for all processes using it and simply split the webhook sending functionality into a secondary task, whichever approach you think is best.

This should resolve the issue of incorrectly waiting for an FQ to finish when it was actually completed earlier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been resolved by moving the original fetch_pacer_doc_by_rd code to fetch_pacer_doc_by_rd_base. Now, fetch_pacer_doc_by_rd simply acts as a celery task wrapper, retaining the exact same arguments and behavior as before.

For the scope of this command, a new celery task, fetch_pacer_doc_by_rd_and_mark_fq_completed, has been introduced. It essentially retains the same functionality as fetch_pacer_doc_by_rd, with the addition that it is also responsible for marking the task as successful if it completes successfully within the same celery task. This resolves the issue described here.

queue=self.queue_name
)
append_value_in_cache(self.docs_to_process_cache_key(), (rd_pk, fq.pk))
self.total_launched += 1
logger.info(
f"Launched download for doc {doc.get('id')} from court {doc.get('docket_entry__docket__court_id')}"
f"\nProgress: {self.total_launched}/{len(self.recap_documents)}"
)
return fq

def enough_time_elapsed(self, date):
now = timezone.now()
return (now - date) < timedelta(seconds=self.interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, this method checks whether the time elapsed since the FQ was marked as completed is more than self.interval, right? If so, I think the comparison operator here should be >?

Otherwise, if the FQ takes longer to complete than self.interval, this could cause the process to get stuck while checking this FQ.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the operator was inverted. It's fixed now.


def should_skip(self, court_id: str) -> bool:
"""Determine if the court is ready to be queried again.

To hit the same court again, the last fetch queue must have
been completed more than `self.interval` seconds ago.
"""

if court_id in self.fetches_in_progress:
fq_pk, retry_count = self.fetches_in_progress[court_id]
fetch_queue = PacerFetchQueue.objects.get(id=fq_pk)
rd_pk = fetch_queue.recap_document_id
if retry_count >= self.max_retries:
# Too many retries means FQ was probably stuck and not handled gracefully, so we keep going.
# We remove this FQ from fetches_in_progress as we'll stop checking this one
self.fetches_in_progress.pop(court_id)
# Then we store its PK in cache to handle FQs w/too many retries later
append_value_in_cache(
self.timed_out_docs_cache_key(), (rd_pk, fq_pk)
)
return False

date_completed = fetch_queue.date_completed
fq_in_progress = date_completed is None
if fq_in_progress or not self.enough_time_elapsed(date_completed):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While debugging the issue described in a previous comment regarding mark_fq_successful, I noticed one of the reasons the command got stuck in a loop was when this condition was met:

if skipped_courts == courts_at_start:
                time.sleep(self.interval)

I wondered why this was happening since there is logic above to abort an FQ in fetches_in_progress if retry_count >= self.max_retries. However, that was not occurring.

The issue is that the retry count is only updated within update_fetches_in_progress, which only seems to sets the retry count to 1 for the FQ that was just scheduled. However, it is also necessary to update the retry_count for an FQ that remains in progress but has not yet been completed.

I think this logic could be added here (I tested it, and it worked), unless you identify a better place to implement it. One consideration is that retries should only be increased if fq_in_progress = True and not when self.enough_time_elapsed(date_completed) = False, because in the latter case, the FQ has already been marked as completed and just needs more time to satisfy the enough_time_elapsed condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now resolved. The retry_count is now updated within the new process_skip_reason method, but only if the previously computed exponential backoff time has elapsed.

return True

return False

def update_fetches_in_progress(self, court_id: str, fq_id: int):
court_last_fetch = self.fetches_in_progress.get(court_id, (fq_id, 0))
retry_count = court_last_fetch[1]
if fq_id == court_last_fetch[0]:
retry_count += 1
self.fetches_in_progress[court_id] = (fq_id, retry_count)

def fetch_next_doc_in_court(
self,
court_id: str,
remaining_courts: dict,
) -> bool:
"""Pop next doc in court and add fetch task to get it from PACER.

If the last FQ for the court is still in progress or was completed
less than `self.interval` seconds ago, we skip it and wait for
the next round to try the same court again.
"""
should_skip = self.should_skip(court_id)
if should_skip:
return True

if remaining_courts[court_id]:
doc = remaining_courts[court_id].pop(0)
try:
fq = self.enqueue_pacer_fetch(doc)
self.update_fetches_in_progress(court_id, fq.id)
except Exception as e:
self.total_errors += 1
logger.error(
f"Error queuing document {doc.get('id')}: {str(e)}"
)

return False

def fetch_docs_from_pacer(self) -> None:
"""Process documents with one fetch per court at a time"""
self.handle_pacer_session()
remaining_courts = self.courts_with_docs.copy()

while remaining_courts:
courts_at_start = len(remaining_courts)
skipped_courts = 0
for court_id in list(remaining_courts.keys()):
was_skipped = self.fetch_next_doc_in_court(
court_id,
remaining_courts,
)
skipped_courts += int(was_skipped)
# If this court doesn't have any more docs, remove from dict:
if not remaining_courts[court_id]:
remaining_courts.pop(court_id)
self.fetches_in_progress.pop(court_id, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue I noticed is that if the last FQ in a court is not marked as completed for some reason, it will not be added to pacer_bulk_fetch.timed_out_docs in Redis. This happens because once it is removed from fetches_in_progress, it is no longer checked in should_skip.

The should_skip method is responsible for verifying whether the FQ has been completed or, if it exceeds the retry limit, adding it to pacer_bulk_fetch.timed_out_docs. Since the FQ is removed before this check happens, it never gets added to timed_out_docs.

I think a possible solution is to confirm that the FQ has been completed before removing it from fetches_in_progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now handled within the new cleanup_finished_court method. An FQ is only removed from remaining_courts and fetches_in_progress if it has been completed."


# If we had to skip all courts that we tried this round,
# add a small delay to avoid hammering the DB
if skipped_courts == courts_at_start:
time.sleep(self.interval)

def process_docs_fetched(self):
"""Apply tasks to process docs that were successfully fetched from PACER."""
cached_fetches = cache.get(self.docs_to_process_cache_key(), [])
fetch_queues_to_process = [fq_pk for (_, fq_pk) in cached_fetches]
fetch_queues = (
PacerFetchQueue.objects.filter(pk__in=fetch_queues_to_process)
.select_related("recap_document")
.only(
"pk",
"status",
"recap_document__id",
"recap_document__ocr_status",
"recap_document__is_available",
"recap_document__filepath_local",
)
)
for fq in fetch_queues:
rd = fq.recap_document
needs_ocr = rd.needs_extraction
has_pdf = rd.filepath_local is not None
fetch_was_successful = fq.status == PROCESSING_STATUS.SUCCESSFUL
if fetch_was_successful and has_pdf and needs_ocr:
self.throttle.maybe_wait()
chain(
extract_recap_pdf.si(rd.pk),
mark_fq_successful.si(fq.pk),
).apply_async(queue=self.queue_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should become, since mark_fq_successful should happen in the retrieval stage.

Suggested change
chain(
extract_recap_pdf.si(rd.pk),
mark_fq_successful.si(fq.pk),
).apply_async(queue=self.queue_name)
extract_recap_pdf.si(rd.pk).apply_async(queue=self.queue_name)

Also, it would be a good idea to add a logger below this line to indicate which extraction document ID has been scheduled. Additionally, logging the progress status of tasks such as the number of tasks scheduled and the remaining tasks to be scheduled, would be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated and added a progress logger.


def handle_fetch_docs(self):
"""Run only the fetching stage."""
logger.info("Starting fetch stage in pacer_bulk_fetch command.")
try:
self.set_user(self.options.get("username", "recap"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, the default should come from argparse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved now defaults to recap

self.identify_documents()
logger.info(
f"{self.user} found {len(self.recap_documents)} documents "
f"across {len(self.courts_with_docs)} courts."
)

self.fetch_docs_from_pacer()

logger.info(
f"Created {self.total_launched} processing queues for a total "
f"of {len(self.recap_documents)} docs found."
)
logger.info(
f"The following PacerFetchQueues were retried too many times: "
f"{cache.get(self.timed_out_docs_cache_key(), [])}"
)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can omit the try/except block here since it only logs an error message that might not be necessary. Instead, we can just let it crash, which would have almost the same effect as this block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the try/except block

logger.error(
f"Fatal error in fetch stage: {str(e)}", exc_info=True
)
raise e

def handle_process_docs(self):
"""Run only the processing stage."""
logger.info("Starting processing stage in pacer_bulk_fetch command.")
try:
self.process_docs_fetched()
except Exception as e:
logger.error(
f"Fatal error in process stage: {str(e)}", exc_info=True
)
raise e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're getting much from this method. Suggest merging with process_docs_fetched (or removing it and letting Sentry catch the error).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This method has been removed.


def handle(self, *args, **options) -> None:
self.options = options
self.setup_logging(self.options.get("testing", False))
self.setup_celery()
self.interval = self.options.get("interval", 2)

stage = options.get("stage")
if stage == "fetch":
self.handle_fetch_docs()
elif stage == "process":
self.handle_process_docs()
else:
raise CommandError(
"Invalid stage passed to pacer_bulk_fetch command."
)
Loading