Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added hierarchical lock for applying data updates #745

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

danyi1212
Copy link
Collaborator

Fixes Issue

Changes proposed

Check List (Check all the applicable boxes)

  • I sign off on contributing this submission to open-source
  • My code follows the code style of this project.
  • My change requires changes to the documentation.
  • I have updated the documentation accordingly.
  • All new and existing tests passed.
  • This PR does not contain plagiarized content.
  • The title of my pull request is a short description of the requested changes.

Screenshots

Note to reviewers

@danyi1212 danyi1212 self-assigned this Jan 27, 2025
Copy link

netlify bot commented Jan 27, 2025

Deploy Preview for opal-docs canceled.

Name Link
🔨 Latest commit 5eabfab
🔍 Latest deploy log https://app.netlify.com/sites/opal-docs/deploys/679a3661ec8c9f0009c9bf1d

Copy link
Contributor

@orweis orweis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good effort but:

  1. You focused too much on restyling the code that it made reviewing very difficult
  2. You have little to no documentation / comments - (you actually removed comments) - on a super sensitive code like this - it's too risky
  3. and most crucial - the design breaks a core concurrency concept (queue for fetching) which would hurt the performance of this component dramatically (we actually discussed this in the design phase)

@@ -107,7 +107,7 @@ async def handle_urls(
results_with_url_and_config = [
(url, config, result)
for (url, config, data), result in zip(urls, results)
if result is not None
if result is not None # FIXME ignores None results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if None (i.e. null) is a valid JSON reply in http

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not necessarily from receiving json null in the HTTP response, it's when the request had no body or any other reason a fetcher (http or else) would return None.
We used to silence that case, not log it and not report it back...

:param result: The fetched data
:param store_transaction: The policy store transaction
"""
policy_store_path = entry.dst_path or ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did you do here?
You renamed the functions for no reason, and you removed inline comments for no reason - which made the comparison here very difficult.

In general your code has zero inline comments- that's bad practice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a new function - wrapping the different set_policy calls. It just has a similar name to the previous queue handling function by accident...

What snippets do you think lack inline comments? Generally, whenever you feel the need to put an inline comment explaining what a code piece should do, you should instead extract it to a function and name the function meaningfully. This way the code is more concise and easier to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should instead extract it to a function and name the function meaningfully

Sometimes that's an overkill.

In this case I'd agree the code can be refactored into smaller parts - but it would only make this PR which should be focused on solving a bug more complex.

The updater flow is pretty complex and needs more clarity

  • I'd start with the comments you removed from the code that is now part of _fetch_and_save_data
  • I'd continue with comments on the lock mechanism, and comments explaining the concurrency model and transaction use

"""Fetches policy data (policy configuration) from backend and updates
it into policy-store (i.e. OPA)"""
reports: list[DataEntryReport] = []
for entry in update.entries:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using for loop instead of list comprehension ( especially with logging inside) is a lot less performant.
At least if there was a reason to do this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance difference is correct especially when talking about larger lists, as it may need to duplicate the array in memory multiple times.
Here, we are doing multiple things with each source entry, so comprehension will not be much clearer and understandable. A generator might be a better way to write it to avoid the list duplication.

For this code specifically, data updates generally have just one or few source entries, so this optimization is unnecessary for the reduction in code readability and simplicity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have prefered if you wouldn't have touched this part, which has nothing to do with the bug fix - as this is a critical fix CR - and this adds noise, especially when the change is potentially detrimental.
That said, I won't argue on this

self._updates_storing_queue = TakeANumberQueue(logger)
self._tasks = TasksPool()
self._tasks = asyncio.TaskGroup()
self._dst_lock = HierarchicalLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? Zero comments on why this lock is here???

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is creating the lock, there is a comment where it's used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I didn't really see inline-comments on the usage (there's a CR comment on that as well)
  2. If someone needs to go into the code to understand a class it's kind of missing the point
  3. This is a sensitive and unique aspect of this code - the basic reasoning should be communicated to future devs

transaction_context as store_transaction,
self._dst_lock.lock(
entry.dst_path
), # lock the destination path to prevent concurrent writes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really that's all the docs you going to put on the race condition here?
This is super fragile code- it needs a ton of comments

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this new flow, IMO it's simple enough to understand without needing excessive comments. Can you please be more specific on what code snippets lack comments or explanations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think considering how elusive the bug was here - it is likely code drift can lead to it or similar issues arising - unless devs understand the flow here VERY well.

Let's talk in person.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because the logic was super complex, involving 3 queues, at different places, which interact with the same objects. This was very hard to follow.
The flow now became much simpler, you can follow from function to function directly, the functions are smaller, and the concurrency model is naive (just run everything together and let the lock handle conflicting shared resources)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat simpler now I agree- good job;
It's still worth documenting this - you can take this line that you wrote here:

the concurrency model is naive (just run everything together and let the lock handle conflicting shared resources)

And use it as the doc - and maybe elaborate a little bit.
Please don't argue on this- it would be be more text than adding a few comments.

self._callbacks_reporter.report_update_results(
whole_report, extra_callbacks
)
)

async def _fetch_and_save_data(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetching and storing should be two separate functions - if you combine them into a function like here it should be calling two daughter functions - one for each and having almost no code in itself.
Separation of duties

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mostly is, first it's calling self._data_fetcher.handle_url() then self._store_fetched_data() with the result.
The complex error handling is obscuring this flow, I'll try to extract that to another function

:return: The data entry report
"""
try:
result = await self._data_fetcher.handle_url(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You killed the performance of this thing - you made all data fetching serial;
There no longer is a separate task for handling fetching, and the update task is singular.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the biggest issue here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, data updates from the root are done concurrently - with lock on each destination. Non-conflicting destinations (the common ones) will be done in parallel

See line 170 -

self._tasks.create_task(self._update_policy_data(update))

We await the lock for this dst, then immediately start fetching the data, then set it to the policy store. This flow is done concurrently between all data updates

https://github.com/permitio/opal/pull/745/files/06a3324f1adb22794cbdeb0f15366d9a988e0294#diff-16614de9db264ea3ec0d36343b453cc9881371fe95ff4a862267eaaf24c563c2R170

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right; I'm sorry I missed this (In my defence the diff here is extremely hard to read )

Copy link
Contributor

@orweis orweis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more comments

# Fetching should be concurrent, but storing should be done in the original order
store_queue_number = await self._updates_storing_queue.take_a_number()
self._tasks.add_task(self._update_policy_data(update, store_queue_number))
self._tasks.create_task(self._update_policy_data(update))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also use a comment explaining the async task flow here - i.e. what aspects are concurrent and whcih are not; and how this interplays with the lock mechanism

:param result: The fetched data
:param store_transaction: The policy store transaction
"""
policy_store_path = entry.dst_path or ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should instead extract it to a function and name the function meaningfully

Sometimes that's an overkill.

In this case I'd agree the code can be refactored into smaller parts - but it would only make this PR which should be focused on solving a bug more complex.

The updater flow is pretty complex and needs more clarity

  • I'd start with the comments you removed from the code that is now part of _fetch_and_save_data
  • I'd continue with comments on the lock mechanism, and comments explaining the concurrency model and transaction use

"""Fetches policy data (policy configuration) from backend and updates
it into policy-store (i.e. OPA)"""
reports: list[DataEntryReport] = []
for entry in update.entries:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have prefered if you wouldn't have touched this part, which has nothing to do with the bug fix - as this is a critical fix CR - and this adds noise, especially when the change is potentially detrimental.
That said, I won't argue on this

transaction_context as store_transaction,
self._dst_lock.lock(
entry.dst_path
), # lock the destination path to prevent concurrent writes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat simpler now I agree- good job;
It's still worth documenting this - you can take this line that you wrote here:

the concurrency model is naive (just run everything together and let the lock handle conflicting shared resources)

And use it as the doc - and maybe elaborate a little bit.
Please don't argue on this- it would be be more text than adding a few comments.

Steps:
1. Iterate over the DataUpdate entries.
2. For each entry, check if any of its topics match our client's topics.
3. Acquire a lock for the destination path, so we don't overwrite concurrently.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
3. Acquire a lock for the destination path, so we don't overwrite concurrently.
3. Acquire a lock for the destination path, so we don't fetch and overwrite concurrently.
- Note: This means that fetches that can technically happen concurrently wait on one another.
This can be improved with a Fetcher-Writer Lock ( a la Reader-Writer Lock ) pattern

provided, generate one for tracking/logging.

Note:
Line 172 explanation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL

Comment on lines +428 to +430
"""Calculates a SHA-256 hash of the given data. If 'data' is not a
string, it is first serialized to JSON. Returns an empty string on
failure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should explain meaning not what it does.

Suggested change
"""Calculates a SHA-256 hash of the given data. If 'data' is not a
string, it is first serialized to JSON. Returns an empty string on
failure.
"""Calculates a SHA-256 hash of the given data to be used to identify the updates (e.g. in logging reports on the transactions) . If 'data' is not a
string, it is first serialized to JSON. Returns an empty string on
failure.

Copy link
Contributor

@orweis orweis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great and important work.

Few last comments placed for final touches.

Celebration:
The Hierarchical Lock mechanism is very elegant

Lessons for next time:
Keep it more focused on the core task, and add comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants