-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iroh-net): Add a Watchable struct for use in the Endpoint API #2806
base: main
Are you sure you want to change the base?
Conversation
This adds a basic Watchable struct for use on the Endpoint API.
Documentation for this PR has been generated and is available at: https://n0-computer.github.io/iroh/pr/2806/docs/iroh/ Last updated: 2024-11-21T13:41:33Z |
@flub |
@mepi262 the current implementation is racy and needs fixing. What makes you interested in this? |
The reason in which I'm interested is followings.
I wish you good health, good luck and success. |
iroh-net/src/util/watchable.rs
Outdated
// TODO(flub): Pretty sure this can miss a write because the epoch and wakers are | ||
// separate: | ||
// - thread 1 runs poll_next | ||
// - thread 2 runs set | ||
// 1. thread 1: load epoch | ||
// 2. thread 2: lock value, replace value, unlock value | ||
// 3. thread 2: store epoch | ||
// 4. thread 2: lock wakers, drain wakers, unlock wakers | ||
// 5. thread 1: lock wakers, install waker, unlock wakers | ||
// | ||
// I believe the epoch and wakers need to be stored in the same RwLock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're completely right about the wakers possibly missing a wakeup once a value is available.
I've tested this implementation of Watchable with loom
, and can confirm that it gets stuck waiting for watcher.initialized()
in some cases, even though there's a watchable.set()
call.
(Interestingly, reproducing it practically requires loom usage, since it's very timing-sensitive and doesn't reproduce easily with running the same test case in a tight loop a couple thousand times.)
You're incorrect about the fact that the epoch and wakers needing to be stored on the same RwLock.
Similar to this example, after writing the waker, you can re-check the epoch to see if there was a write in-between.
Adding this second check, it passes loom.
Made some changes:
For anyone who's interested in running the loom test, you get some fun output like this:
|
Should we (well, I) maybe address #2860 (comment) in here too?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Any reason this is still draft?
#[derive(thiserror::Error, Debug)] | ||
pub enum Error { | ||
#[error("Watch lost connection to underlying Watchable, it was dropped")] | ||
WatchableClosed, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any point to having this as an enum instead of struct WatchableClosed
?
I assume it would be extensibility, but then it should be marked as non-exhaustive? Doing that also makes using it harder though. So my vote is to make this a struct WatchableClosed
for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah thought about that, too. Thanks for chiming in. I'll call it Disconnected
similar to what it's called in the watchable
crate.
/// Note that the `Option` is only there to allow initialization. | ||
/// Once initialized the value can never be cleared again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment now belongs in the State
struct?
if let Some(value) = state.value.clone() { | ||
// Once initialized our Option is never set back to None, but nevertheless | ||
// this code is safer without relying on that invariant. | ||
return Poll::Ready((epoch, value)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens to our installed waker now? We might get a spurious wakeup I guess? Which is probably the easiest way to go about this rather than trying to remove our waker again?
impl<T: Clone + Eq> Watcher<T> { | ||
/// Returns the currently held value. | ||
/// | ||
/// Returns `None` if the value was not set yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns `None` if the value was not set yet. | |
/// Returns `None` if the value was not yet initialised. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You keep spelling "initialized" wrong 😜
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol, it's so hard!
Ok(shared.get()) | ||
} | ||
|
||
/// Returns a future completing once the value is initialized. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns a future completing once the value is initialized. | |
/// Returns a future completing once the value is initialized. | |
/// | |
/// The future will complete immediately if the value was already initialized. |
/// used to operate on the most recent value. If the stream is not yet initialized the | ||
/// first item of the stream will not be readily available. | ||
/// | ||
/// Note however that only the last item is stored. If the stream is not polled when an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Note however that only the last item is stored. If the stream is not polled when an | |
/// Note however, that only the last item is stored. If the stream is not polled when an |
WatchNextFut { watcher: self } | ||
} | ||
|
||
/// Returns a stream which will yield an items for the most recent value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns a stream which will yield an items for the most recent value. | |
/// Returns a stream which will yield items containing the most recent value. |
WatchNextStream { watcher: self } | ||
} | ||
|
||
/// Returns a stream which will yield an item for changes to the watched value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns a stream which will yield an item for changes to the watched value. | |
/// Returns a stream which will yield items for changes to the watched value. |
} | ||
|
||
#[derive(Debug)] | ||
#[repr(transparent)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? I thought this was a thing only needed for C APIs.
|
||
#[derive(Debug)] | ||
#[repr(transparent)] | ||
pub struct WatchNextFut<'a, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a doc comment
I think this PR just introduces the watchable, follow-up PRs can start using it. |
Ah okay. Well my plan was to learn from the refactor of making iroh-net use this new watchable. And I'm already finding things. E.g. do we want the Watchable to have So, I'm still learning! |
Makes sense as well. No big deal if you'd like to do this all in one place. |
Description
This implements a
Watchable
struct and aWatcher
which provides access to the watched value in several ways, including some streams.Breaking Changes
Not yet, adopting it on the API will break things.
Notes & open questions
Change checklist