-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added hierarchical lock for applying data updates #745
base: master
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for opal-docs canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good effort but:
- You focused too much on restyling the code that it made reviewing very difficult
- You have little to no documentation / comments - (you actually removed comments) - on a super sensitive code like this - it's too risky
- and most crucial - the design breaks a core concurrency concept (queue for fetching) which would hurt the performance of this component dramatically (we actually discussed this in the design phase)
@@ -107,7 +107,7 @@ async def handle_urls( | |||
results_with_url_and_config = [ | |||
(url, config, result) | |||
for (url, config, data), result in zip(urls, results) | |||
if result is not None | |||
if result is not None # FIXME ignores None results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if None (i.e. null) is a valid JSON reply in http
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not necessarily from receiving json null
in the HTTP response, it's when the request had no body or any other reason a fetcher (http or else) would return None.
We used to silence that case, not log it and not report it back...
:param result: The fetched data | ||
:param store_transaction: The policy store transaction | ||
""" | ||
policy_store_path = entry.dst_path or "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What did you do here?
You renamed the functions for no reason, and you removed inline comments for no reason - which made the comparison here very difficult.
In general your code has zero inline comments- that's bad practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a new function - wrapping the different set_policy
calls. It just has a similar name to the previous queue handling function by accident...
What snippets do you think lack inline comments? Generally, whenever you feel the need to put an inline comment explaining what a code piece should do, you should instead extract it to a function and name the function meaningfully. This way the code is more concise and easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should instead extract it to a function and name the function meaningfully
Sometimes that's an overkill.
In this case I'd agree the code can be refactored into smaller parts - but it would only make this PR which should be focused on solving a bug more complex.
The updater flow is pretty complex and needs more clarity
- I'd start with the comments you removed from the code that is now part of
_fetch_and_save_data
- I'd continue with comments on the lock mechanism, and comments explaining the concurrency model and transaction use
"""Fetches policy data (policy configuration) from backend and updates | ||
it into policy-store (i.e. OPA)""" | ||
reports: list[DataEntryReport] = [] | ||
for entry in update.entries: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using for loop instead of list comprehension ( especially with logging inside) is a lot less performant.
At least if there was a reason to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance difference is correct especially when talking about larger lists, as it may need to duplicate the array in memory multiple times.
Here, we are doing multiple things with each source entry, so comprehension will not be much clearer and understandable. A generator might be a better way to write it to avoid the list duplication.
For this code specifically, data updates generally have just one or few source entries, so this optimization is unnecessary for the reduction in code readability and simplicity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have prefered if you wouldn't have touched this part, which has nothing to do with the bug fix - as this is a critical fix CR - and this adds noise, especially when the change is potentially detrimental.
That said, I won't argue on this
self._updates_storing_queue = TakeANumberQueue(logger) | ||
self._tasks = TasksPool() | ||
self._tasks = asyncio.TaskGroup() | ||
self._dst_lock = HierarchicalLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really? Zero comments on why this lock is here???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is creating the lock, there is a comment where it's used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I didn't really see inline-comments on the usage (there's a CR comment on that as well)
- If someone needs to go into the code to understand a class it's kind of missing the point
- This is a sensitive and unique aspect of this code - the basic reasoning should be communicated to future devs
transaction_context as store_transaction, | ||
self._dst_lock.lock( | ||
entry.dst_path | ||
), # lock the destination path to prevent concurrent writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really that's all the docs you going to put on the race condition here?
This is super fragile code- it needs a ton of comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this new flow, IMO it's simple enough to understand without needing excessive comments. Can you please be more specific on what code snippets lack comments or explanations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think considering how elusive the bug was here - it is likely code drift can lead to it or similar issues arising - unless devs understand the flow here VERY well.
Let's talk in person.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because the logic was super complex, involving 3 queues, at different places, which interact with the same objects. This was very hard to follow.
The flow now became much simpler, you can follow from function to function directly, the functions are smaller, and the concurrency model is naive (just run everything together and let the lock handle conflicting shared resources)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat simpler now I agree- good job;
It's still worth documenting this - you can take this line that you wrote here:
the concurrency model is naive (just run everything together and let the lock handle conflicting shared resources)
And use it as the doc - and maybe elaborate a little bit.
Please don't argue on this- it would be be more text than adding a few comments.
self._callbacks_reporter.report_update_results( | ||
whole_report, extra_callbacks | ||
) | ||
) | ||
|
||
async def _fetch_and_save_data( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetching and storing should be two separate functions - if you combine them into a function like here it should be calling two daughter functions - one for each and having almost no code in itself.
Separation of duties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It mostly is, first it's calling self._data_fetcher.handle_url()
then self._store_fetched_data()
with the result.
The complex error handling is obscuring this flow, I'll try to extract that to another function
:return: The data entry report | ||
""" | ||
try: | ||
result = await self._data_fetcher.handle_url( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You killed the performance of this thing - you made all data fetching serial;
There no longer is a separate task for handling fetching, and the update task is singular.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the biggest issue here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, data updates from the root are done concurrently - with lock on each destination. Non-conflicting destinations (the common ones) will be done in parallel
See line 170 -
self._tasks.create_task(self._update_policy_data(update))
We await the lock for this dst, then immediately start fetching the data, then set it to the policy store. This flow is done concurrently between all data updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right; I'm sorry I missed this (In my defence the diff here is extremely hard to read )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more comments
# Fetching should be concurrent, but storing should be done in the original order | ||
store_queue_number = await self._updates_storing_queue.take_a_number() | ||
self._tasks.add_task(self._update_policy_data(update, store_queue_number)) | ||
self._tasks.create_task(self._update_policy_data(update)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also use a comment explaining the async task flow here - i.e. what aspects are concurrent and whcih are not; and how this interplays with the lock mechanism
:param result: The fetched data | ||
:param store_transaction: The policy store transaction | ||
""" | ||
policy_store_path = entry.dst_path or "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should instead extract it to a function and name the function meaningfully
Sometimes that's an overkill.
In this case I'd agree the code can be refactored into smaller parts - but it would only make this PR which should be focused on solving a bug more complex.
The updater flow is pretty complex and needs more clarity
- I'd start with the comments you removed from the code that is now part of
_fetch_and_save_data
- I'd continue with comments on the lock mechanism, and comments explaining the concurrency model and transaction use
"""Fetches policy data (policy configuration) from backend and updates | ||
it into policy-store (i.e. OPA)""" | ||
reports: list[DataEntryReport] = [] | ||
for entry in update.entries: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have prefered if you wouldn't have touched this part, which has nothing to do with the bug fix - as this is a critical fix CR - and this adds noise, especially when the change is potentially detrimental.
That said, I won't argue on this
transaction_context as store_transaction, | ||
self._dst_lock.lock( | ||
entry.dst_path | ||
), # lock the destination path to prevent concurrent writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat simpler now I agree- good job;
It's still worth documenting this - you can take this line that you wrote here:
the concurrency model is naive (just run everything together and let the lock handle conflicting shared resources)
And use it as the doc - and maybe elaborate a little bit.
Please don't argue on this- it would be be more text than adding a few comments.
Steps: | ||
1. Iterate over the DataUpdate entries. | ||
2. For each entry, check if any of its topics match our client's topics. | ||
3. Acquire a lock for the destination path, so we don't overwrite concurrently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3. Acquire a lock for the destination path, so we don't overwrite concurrently. | |
3. Acquire a lock for the destination path, so we don't fetch and overwrite concurrently. | |
- Note: This means that fetches that can technically happen concurrently wait on one another. | |
This can be improved with a Fetcher-Writer Lock ( a la Reader-Writer Lock ) pattern |
provided, generate one for tracking/logging. | ||
|
||
Note: | ||
Line 172 explanation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL
"""Calculates a SHA-256 hash of the given data. If 'data' is not a | ||
string, it is first serialized to JSON. Returns an empty string on | ||
failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should explain meaning not what it does.
"""Calculates a SHA-256 hash of the given data. If 'data' is not a | |
string, it is first serialized to JSON. Returns an empty string on | |
failure. | |
"""Calculates a SHA-256 hash of the given data to be used to identify the updates (e.g. in logging reports on the transactions) . If 'data' is not a | |
string, it is first serialized to JSON. Returns an empty string on | |
failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great and important work.
Few last comments placed for final touches.
Celebration:
The Hierarchical Lock mechanism is very elegant
Lessons for next time:
Keep it more focused on the core task, and add comments.
Fixes Issue
Changes proposed
Check List (Check all the applicable boxes)
Screenshots
Note to reviewers