Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pacer): add command to fetch docs filtered by page count from PACER #4901

Open
wants to merge 44 commits into
base: main
Choose a base branch
from

Conversation

elisa-a-v
Copy link
Contributor

@elisa-a-v elisa-a-v commented Jan 8, 2025

Closes #4839

This PR implements a Django command to fetch docs from PACER within a given range of page_count.

The command is split in two stages, so instead of processing all tasks for each doc in a single chain—like the do_pacer_fetch method does—we first fetch docs from PACER without processing them. This means only executing the first task (fetch_pacer_doc_by_rd) which is the only one that interacts with the PACER API. This gives us more control to avoid hitting the same court too often, because the processing stage takes up most of the execution time for a given doc, and this time can range from a few seconds to several minutes.

To make sure we don't hit the same court too often, we keep track of the FQs in progress per court, and only try to fetch the next document in the same court if two conditions are met:

  1. the previous FQ in that court has been completed.
  2. enough time has elapsed since its completion (defaults to 2 seconds).

If these conditions are not met, we skip that court in that round, and try again next round. This is retried up to max_retries (defaults to 5) attempts, and if after all those checks the FQ is still not complete, we store it in cache as "timed out", and we continue with the next doc in said court.

During the fetching stage, we keep track of the fetched docs in cache so we know which ones to process in the second stage, and to avoid retrying to fetch them again in case we need to run the first stage again for some reason, like an interrumption of any sort.

After all docs have been fetched, we run the second stage and process all the fetched docs without any external rate limit. We know which docs to process by checking the cache.

Separating the command in two stages also gives us a chance to make sure all docs were fetched correctly before processing them, and to fix anything that would need to be fixed in case of any errors, as well as figure out what happened to the timed out FQs.

…etch

- introduces new build_pdf_retrieval_task_chain method
- refactors do_pacer_fetch to now use that new method instead
Instead of processing all tasks for each doc in a single chain,
now we first fetch docs from PACER without processing them. This
gives us more control to avoid hitting the same court too often.
After all docs have been fetched, we process them all without any
external rate limit.
…gress

A local variable was being used and passed through several methods
as arguments to keep track of the last fetch queue checked per court.
We now use the instance attribute instead.
…e in cache

We first check if the key exists and use the previous value if it does,
otherwise the value is initialized with an empty list.
We then append the value to that key-value pair.

The logic is abstracted from update_cached_docs_to_process which now
uses this new helper function.
The pacer_bulk_fetch command can now be ran in two distinct stages,
one at a time, using the --stage arg.

This gives us more control over the command execution, and allows
us time to check and fix any issues when fetching docs before
beginning the processing stage execution.
@elisa-a-v elisa-a-v force-pushed the 4839-known-big-docs-retrieval branch from 1172ba6 to a96d9fe Compare January 31, 2025 02:48
@elisa-a-v elisa-a-v marked this pull request as ready for review January 31, 2025 20:44
@elisa-a-v elisa-a-v requested a review from mlissner January 31, 2025 20:49
Copy link
Member

@mlissner mlissner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elisa — this looks good! I made a few smallish comments, but I think you've got the design right. A few other thoughts:

  1. I haven't checked the tests. I leave that to Alberto.
  2. I've only skimmed the code for architecture, and anything else that jumps out, but it looks about right.
  3. I think this is a bit overbuilt, and I highlighted a few places where we can just let it crash, simply, etc. (Not the worst thing!)
  4. It looks like there are throttles to prevent hitting the DB and Redis too much, but let's make sure we don't have a loop that hits them non-stop. I think I checked this, but not carefully enough to be sure.
  5. Using redis for this is helpful, but we don't want to make objects in redis that last forever and that exist long after we've run this code. Can you add timeouts to them? Even if they're very long, we want that memory back eventually.

Looks good otherwise. Moby Dick would be proud, I think you caught your white whale.


def setup_celery(self) -> None:
"""Setup Celery by setting the queue_name and throttle."""
self.queue_name = self.options.get("queue_name", "pacer_bulk_fetch")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers won't do tasks in queues they're not configured for, so if you run this code and we don't have a queue named pacer_bulk_fetch, you'll create tasks that never get processed. We have a handful of queues set up that you can use called batch0 to batch5.

But that aside, what you're doing here is setting a default value for a command arg, which should be done up in argparse instead.

"""Run only the fetching stage."""
logger.info("Starting fetch stage in pacer_bulk_fetch command.")
try:
self.set_user(self.options.get("username", "recap"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, the default should come from argparse.

Comment on lines 131 to 134
if not username:
raise CommandError(
"No username provided, cannot create PacerFetchQueues."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a default, then we shouldn't need this code.

Comment on lines 135 to 138
try:
self.user = User.objects.get(username=username)
except User.DoesNotExist:
raise CommandError(f"User {username} does not exist")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just let it crash at this point, honestly, which makes this entire method unneeded.

Comment on lines 142 to 149
filters = [
Q(pacer_doc_id__isnull=False),
Q(is_available=False),
]
if self.options.get("min_page_count"):
filters.append(Q(page_count__gte=self.options["min_page_count"]))
if self.options.get("max_page_count"):
filters.append(Q(page_count__lte=self.options["max_page_count"]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use argparse to make min_page_count a required parameter and to default max_page_count to 10_000, you can get rid of Q altogether and just build the query the same way every time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was tempted to ask for max_page_count to be removed under YAGNI, but I think we will need it when we do shorter docs next.

# Only try again with those that were timed out before:
cached_timed_out = cache.get(self.timed_out_docs_cache_key(), [])
previously_timed_out = [rd_pk for (rd_pk, _) in cached_timed_out]
redundant = set(previously_fetched) - set(previously_timed_out)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ids_to_skip is more clear?

Suggested change
redundant = set(previously_fetched) - set(previously_timed_out)
ids_to_skip = set(previously_fetched) - set(previously_timed_out)

in cache so we know which ones to process in a later stage of
this command.
"""
self.throttle.maybe_wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't completely thought this through, but it feels like you've got this double-throttled. If you remove this throttle completely, would it work just fine? This throttle is usually used to prevent the queue from growing out of control, but that shouldn't happen anyway, since we're only doing one court at a time.

Comment on lines 360 to 369
def handle_process_docs(self):
"""Run only the processing stage."""
logger.info("Starting processing stage in pacer_bulk_fetch command.")
try:
self.process_docs_fetched()
except Exception as e:
logger.error(
f"Fatal error in process stage: {str(e)}", exc_info=True
)
raise e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're getting much from this method. Suggest merging with process_docs_fetched (or removing it and letting Sentry catch the error).

@mlissner mlissner assigned albertisfu and unassigned mlissner Jan 31, 2025
Copy link
Contributor

@albertisfu albertisfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @elisa-a-v! This looks about right. I only found some issues that might need to be addressed, along with some additional suggestions.

def add_arguments(self, parser) -> None:
parser.add_argument(
"--interval",
type=float,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Mike suggested, it's better to move the defaults here.

Suggested change
type=float,
type=float,
default=2,

)
parser.add_argument(
"--min-page-count",
type=int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can add the required parameter for this argument.

Suggested change
type=int,
type=int,
required=True,

)
parser.add_argument(
"--username",
type=str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type=str,
type=str,
default="recap",

)
parser.add_argument(
"--queue-name",
type=str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type=str,
type=str,
default="batch0",

recap_document_id=rd_pk,
user_id=self.user.pk,
)
fetch_pacer_doc_by_rd.si(rd_pk, fq.pk).apply_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an issue related to this line, but it also affects another line. I'll describe the issue here and then provide a related comment below.

The main problem is that PacerFetchQueue is not being marked as successfully completed during the retrieval stage. This process is handled by mark_fq_successful, which is currently called only after the document extraction is finished in the second stage of the command.

This can cause the command to get stuck while waiting for the FQs to be marked as completed. Specifically, in this line:

if skipped_courts == courts_at_start:
                time.sleep(self.interval)

Since all FQs remain in progress, the command keeps waiting unnecessarily.

To prevent this, the FQ should be marked as completed immediately after retrieval in fetch_pacer_doc_by_rd.

I tested this approach:

chain(
          fetch_pacer_doc_by_rd.si(rd_pk, fq.pk).set(queue=self.queue_name),
          mark_fq_successful.si(fq.pk).set(queue=self.queue_name)
).apply_async()

This partially resolves the issue. However, marking the FQ as successful can still be delayed because both fetch_pacer_doc_by_rd and mark_fq_successful are processed by workers in the queue. Depending on worker availability, mark_fq_successful may take a few seconds to execute, leading to unnecessary retries or even exceeding the retry count, causing FQs to be incorrectly marked as timed out.

To fully resolve this, mark_fq_successful should be executed as part of the fetch_pacer_doc_by_rd task.

Currently, the mark_fq_successful task also sends webhooks for the Fetch API, so it makes sense to execute it as a secondary task. However, since webhooks are not required for this retrieval, it seems reasonable to update the FQ date_completed, status, and message directly within fetch_pacer_doc_by_rd.

We just need to ensure that this logic is not duplicated for regular FQ processing, which will continue relying on mark_fq_successful. One approach could be to introduce a new argument in fetch_pacer_doc_by_rd to indicate whether the successful FQ status should be updated within the task. Alternatively, we could move this logic entirely into fetch_pacer_doc_by_rd for all processes using it and simply split the webhook sending functionality into a secondary task, whichever approach you think is best.

This should resolve the issue of incorrectly waiting for an FQ to finish when it was actually completed earlier.

Comment on lines 45 to 49
0: [DocketEntryFactory(docket=cls.dockets[0]) for _ in range(300)],
1: [DocketEntryFactory(docket=cls.dockets[1]) for _ in range(8)],
2: [DocketEntryFactory(docket=cls.dockets[2]) for _ in range(20)],
3: [DocketEntryFactory(docket=cls.dockets[3]) for _ in range(15)],
4: [DocketEntryFactory(docket=cls.dockets[4]) for _ in range(10)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use fewer factories for these tests? I think we might be able to assert the same behavior with just a few of them? for example, instead of creating 300 for DocketEntry 0, we could create just 10, and so on.

Comment on lines 111 to 136
@patch(
"cl.search.management.commands.pacer_bulk_fetch.Command.should_skip",
return_value=False,
)
@patch(
"cl.search.management.commands.pacer_bulk_fetch.Command.docs_to_process_cache_key",
return_value="pacer_bulk_fetch.test_page_count_filtering.docs_to_process",
)
@patch(
"cl.search.management.commands.pacer_bulk_fetch.Command.timed_out_docs_cache_key",
return_value="pacer_bulk_fetch.test_page_count_filtering.timed_out_docs",
)
@patch(
"cl.search.management.commands.pacer_bulk_fetch.append_value_in_cache",
wraps=append_value_in_cache,
)
@patch(
"cl.search.management.commands.pacer_bulk_fetch.CeleryThrottle.maybe_wait"
)
@patch("cl.search.management.commands.pacer_bulk_fetch.time.sleep")
@patch(
"cl.search.management.commands.pacer_bulk_fetch.get_or_cache_pacer_cookies"
)
@patch(
"cl.search.management.commands.pacer_bulk_fetch.fetch_pacer_doc_by_rd.si"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that many of these mocks are shared across different tests in the class. You can move these common mocks (as long as they don’t require a specific return_value or method unique to a test) to the class level. This would help reduce duplicated code across test methods.

mock_fetched_cache_key,
mock_should_skip,
):
"""Test that rate limiting triggers sleep calls (mocked)."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this test intended to check? Is it different from test_page_count_filtering?

},
{
"name": "enough time passed",
"time_elapsed": True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to test the actual enough_time_elapsed method in this test instead of just mocking its return value as True? I think this would be helpful in catching errors like the one mentioned in the previous comment related to enough_time_elapsed. time_machine can be useful for this


self.command.fetch_docs_from_pacer()

mock_sleep.assert_not_called()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for the opposite scenario, where sleep should be called?

@albertisfu albertisfu assigned elisa-a-v and unassigned albertisfu Feb 5, 2025
@mlissner mlissner assigned albertisfu and unassigned elisa-a-v Feb 10, 2025
@patch("cl.recap.tasks.get_pacer_cookie_from_cache")
@patch(
"cl.recap.tasks.is_pacer_court_accessible",
side_effect=lambda a: True,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

return only makes sense inside a function

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Leave a nosemgrep comment directly above or at the end of line 522 like so // nosemgrep: python.lang.maintainability.return.return-not-in-function

Take care to validate that this is not a true positive finding before ignoring it.
Learn more about ignoring code, files and folders here.

You can view more details about this finding in the Semgrep AppSec Platform.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In progress
Development

Successfully merging this pull request may close these issues.

Make a script to get the docs we already know are >= 1000 pages long
3 participants