-
-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pacer): add command to fetch docs filtered by page count from PACER #4901
base: main
Are you sure you want to change the base?
Conversation
…etch - introduces new build_pdf_retrieval_task_chain method - refactors do_pacer_fetch to now use that new method instead
Instead of processing all tasks for each doc in a single chain, now we first fetch docs from PACER without processing them. This gives us more control to avoid hitting the same court too often. After all docs have been fetched, we process them all without any external rate limit.
…gress A local variable was being used and passed through several methods as arguments to keep track of the last fetch queue checked per court. We now use the instance attribute instead.
…e in cache We first check if the key exists and use the previous value if it does, otherwise the value is initialized with an empty list. We then append the value to that key-value pair. The logic is abstracted from update_cached_docs_to_process which now uses this new helper function.
…redundant DB queries later
The pacer_bulk_fetch command can now be ran in two distinct stages, one at a time, using the --stage arg. This gives us more control over the command execution, and allows us time to check and fix any issues when fetching docs before beginning the processing stage execution.
1172ba6
to
a96d9fe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elisa — this looks good! I made a few smallish comments, but I think you've got the design right. A few other thoughts:
- I haven't checked the tests. I leave that to Alberto.
- I've only skimmed the code for architecture, and anything else that jumps out, but it looks about right.
- I think this is a bit overbuilt, and I highlighted a few places where we can just let it crash, simply, etc. (Not the worst thing!)
- It looks like there are throttles to prevent hitting the DB and Redis too much, but let's make sure we don't have a loop that hits them non-stop. I think I checked this, but not carefully enough to be sure.
- Using redis for this is helpful, but we don't want to make objects in redis that last forever and that exist long after we've run this code. Can you add timeouts to them? Even if they're very long, we want that memory back eventually.
Looks good otherwise. Moby Dick would be proud, I think you caught your white whale.
|
||
def setup_celery(self) -> None: | ||
"""Setup Celery by setting the queue_name and throttle.""" | ||
self.queue_name = self.options.get("queue_name", "pacer_bulk_fetch") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workers won't do tasks in queues they're not configured for, so if you run this code and we don't have a queue named pacer_bulk_fetch
, you'll create tasks that never get processed. We have a handful of queues set up that you can use called batch0
to batch5
.
But that aside, what you're doing here is setting a default value for a command arg, which should be done up in argparse instead.
"""Run only the fetching stage.""" | ||
logger.info("Starting fetch stage in pacer_bulk_fetch command.") | ||
try: | ||
self.set_user(self.options.get("username", "recap")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, the default should come from argparse.
if not username: | ||
raise CommandError( | ||
"No username provided, cannot create PacerFetchQueues." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a default, then we shouldn't need this code.
try: | ||
self.user = User.objects.get(username=username) | ||
except User.DoesNotExist: | ||
raise CommandError(f"User {username} does not exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just let it crash at this point, honestly, which makes this entire method unneeded.
filters = [ | ||
Q(pacer_doc_id__isnull=False), | ||
Q(is_available=False), | ||
] | ||
if self.options.get("min_page_count"): | ||
filters.append(Q(page_count__gte=self.options["min_page_count"])) | ||
if self.options.get("max_page_count"): | ||
filters.append(Q(page_count__lte=self.options["max_page_count"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use argparse to make min_page_count a required parameter and to default max_page_count to 10_000, you can get rid of Q altogether and just build the query the same way every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was tempted to ask for max_page_count to be removed under YAGNI, but I think we will need it when we do shorter docs next.
# Only try again with those that were timed out before: | ||
cached_timed_out = cache.get(self.timed_out_docs_cache_key(), []) | ||
previously_timed_out = [rd_pk for (rd_pk, _) in cached_timed_out] | ||
redundant = set(previously_fetched) - set(previously_timed_out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps ids_to_skip
is more clear?
redundant = set(previously_fetched) - set(previously_timed_out) | |
ids_to_skip = set(previously_fetched) - set(previously_timed_out) |
in cache so we know which ones to process in a later stage of | ||
this command. | ||
""" | ||
self.throttle.maybe_wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't completely thought this through, but it feels like you've got this double-throttled. If you remove this throttle completely, would it work just fine? This throttle is usually used to prevent the queue from growing out of control, but that shouldn't happen anyway, since we're only doing one court at a time.
def handle_process_docs(self): | ||
"""Run only the processing stage.""" | ||
logger.info("Starting processing stage in pacer_bulk_fetch command.") | ||
try: | ||
self.process_docs_fetched() | ||
except Exception as e: | ||
logger.error( | ||
f"Fatal error in process stage: {str(e)}", exc_info=True | ||
) | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you're getting much from this method. Suggest merging with process_docs_fetched (or removing it and letting Sentry catch the error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @elisa-a-v! This looks about right. I only found some issues that might need to be addressed, along with some additional suggestions.
def add_arguments(self, parser) -> None: | ||
parser.add_argument( | ||
"--interval", | ||
type=float, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As Mike suggested, it's better to move the defaults here.
type=float, | |
type=float, | |
default=2, |
) | ||
parser.add_argument( | ||
"--min-page-count", | ||
type=int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you can add the required parameter for this argument.
type=int, | |
type=int, | |
required=True, |
) | ||
parser.add_argument( | ||
"--username", | ||
type=str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type=str, | |
type=str, | |
default="recap", |
) | ||
parser.add_argument( | ||
"--queue-name", | ||
type=str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type=str, | |
type=str, | |
default="batch0", |
recap_document_id=rd_pk, | ||
user_id=self.user.pk, | ||
) | ||
fetch_pacer_doc_by_rd.si(rd_pk, fq.pk).apply_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found an issue related to this line, but it also affects another line. I'll describe the issue here and then provide a related comment below.
The main problem is that PacerFetchQueue
is not being marked as successfully completed during the retrieval stage. This process is handled by mark_fq_successful
, which is currently called only after the document extraction is finished in the second stage of the command.
This can cause the command to get stuck while waiting for the FQs to be marked as completed. Specifically, in this line:
if skipped_courts == courts_at_start:
time.sleep(self.interval)
Since all FQs remain in progress, the command keeps waiting unnecessarily.
To prevent this, the FQ should be marked as completed immediately after retrieval in fetch_pacer_doc_by_rd
.
I tested this approach:
chain(
fetch_pacer_doc_by_rd.si(rd_pk, fq.pk).set(queue=self.queue_name),
mark_fq_successful.si(fq.pk).set(queue=self.queue_name)
).apply_async()
This partially resolves the issue. However, marking the FQ as successful can still be delayed because both fetch_pacer_doc_by_rd
and mark_fq_successful
are processed by workers in the queue. Depending on worker availability, mark_fq_successful
may take a few seconds to execute, leading to unnecessary retries or even exceeding the retry count, causing FQs to be incorrectly marked as timed out.
To fully resolve this, mark_fq_successful
should be executed as part of the fetch_pacer_doc_by_rd
task.
Currently, the mark_fq_successful
task also sends webhooks for the Fetch API, so it makes sense to execute it as a secondary task. However, since webhooks are not required for this retrieval, it seems reasonable to update the FQ date_completed
, status
, and message
directly within fetch_pacer_doc_by_rd
.
We just need to ensure that this logic is not duplicated for regular FQ processing, which will continue relying on mark_fq_successful
. One approach could be to introduce a new argument in fetch_pacer_doc_by_rd
to indicate whether the successful FQ status should be updated within the task. Alternatively, we could move this logic entirely into fetch_pacer_doc_by_rd
for all processes using it and simply split the webhook sending functionality into a secondary task, whichever approach you think is best.
This should resolve the issue of incorrectly waiting for an FQ to finish when it was actually completed earlier.
0: [DocketEntryFactory(docket=cls.dockets[0]) for _ in range(300)], | ||
1: [DocketEntryFactory(docket=cls.dockets[1]) for _ in range(8)], | ||
2: [DocketEntryFactory(docket=cls.dockets[2]) for _ in range(20)], | ||
3: [DocketEntryFactory(docket=cls.dockets[3]) for _ in range(15)], | ||
4: [DocketEntryFactory(docket=cls.dockets[4]) for _ in range(10)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use fewer factories for these tests? I think we might be able to assert the same behavior with just a few of them? for example, instead of creating 300 for DocketEntry 0, we could create just 10, and so on.
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.Command.should_skip", | ||
return_value=False, | ||
) | ||
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.Command.docs_to_process_cache_key", | ||
return_value="pacer_bulk_fetch.test_page_count_filtering.docs_to_process", | ||
) | ||
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.Command.timed_out_docs_cache_key", | ||
return_value="pacer_bulk_fetch.test_page_count_filtering.timed_out_docs", | ||
) | ||
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.append_value_in_cache", | ||
wraps=append_value_in_cache, | ||
) | ||
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.CeleryThrottle.maybe_wait" | ||
) | ||
@patch("cl.search.management.commands.pacer_bulk_fetch.time.sleep") | ||
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.get_or_cache_pacer_cookies" | ||
) | ||
@patch( | ||
"cl.search.management.commands.pacer_bulk_fetch.fetch_pacer_doc_by_rd.si" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that many of these mocks are shared across different tests in the class. You can move these common mocks (as long as they don’t require a specific return_value
or method unique to a test) to the class level. This would help reduce duplicated code across test methods.
mock_fetched_cache_key, | ||
mock_should_skip, | ||
): | ||
"""Test that rate limiting triggers sleep calls (mocked).""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this test intended to check? Is it different from test_page_count_filtering
?
}, | ||
{ | ||
"name": "enough time passed", | ||
"time_elapsed": True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to test the actual enough_time_elapsed
method in this test instead of just mocking its return value as True? I think this would be helpful in catching errors like the one mentioned in the previous comment related to enough_time_elapsed
. time_machine
can be useful for this
|
||
self.command.fetch_docs_from_pacer() | ||
|
||
mock_sleep.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test case for the opposite scenario, where sleep
should be called?
@patch("cl.recap.tasks.get_pacer_cookie_from_cache") | ||
@patch( | ||
"cl.recap.tasks.is_pacer_court_accessible", | ||
side_effect=lambda a: True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Semgrep identified an issue in your code:
return
only makes sense inside a function
To resolve this comment:
🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.
💬 Ignore this finding
Leave a nosemgrep comment directly above or at the end of line 522 like so // nosemgrep: python.lang.maintainability.return.return-not-in-function
Take care to validate that this is not a true positive finding before ignoring it.
Learn more about ignoring code, files and folders here.
You can view more details about this finding in the Semgrep AppSec Platform.
…purchased only once
Closes #4839
This PR implements a Django command to fetch docs from PACER within a given range of
page_count
.The command is split in two stages, so instead of processing all tasks for each doc in a single chain—like the
do_pacer_fetch
method does—we first fetch docs from PACER without processing them. This means only executing the first task (fetch_pacer_doc_by_rd
) which is the only one that interacts with the PACER API. This gives us more control to avoid hitting the same court too often, because the processing stage takes up most of the execution time for a given doc, and this time can range from a few seconds to several minutes.To make sure we don't hit the same court too often, we keep track of the FQs in progress per court, and only try to fetch the next document in the same court if two conditions are met:
If these conditions are not met, we skip that court in that round, and try again next round. This is retried up to
max_retries
(defaults to 5) attempts, and if after all those checks the FQ is still not complete, we store it in cache as "timed out", and we continue with the next doc in said court.During the fetching stage, we keep track of the fetched docs in cache so we know which ones to process in the second stage, and to avoid retrying to fetch them again in case we need to run the first stage again for some reason, like an interrumption of any sort.
After all docs have been fetched, we run the second stage and process all the fetched docs without any external rate limit. We know which docs to process by checking the cache.
Separating the command in two stages also gives us a chance to make sure all docs were fetched correctly before processing them, and to fix anything that would need to be fixed in case of any errors, as well as figure out what happened to the timed out FQs.