-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feed caching #212
Feed caching #212
Conversation
piker/_cacheables.py
Outdated
# maybe_open_ctx() below except it uses an async exit statck. | ||
# ideally wer pick one or the other. | ||
@asynccontextmanager | ||
async def open_cached_client( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is old code that i think should be ported to the maybe_open_ctx()
below but for broker backend client instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boo yeah. worked like a charm 🏄🏼
piker/_cacheables.py
Outdated
def get_and_use() -> AsyncIterable[T]: | ||
# key error must bubble here | ||
feed = cache.ctxs[key] | ||
log.info(f'Reusing cached feed for {key}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess this should be some representation format instead of "feed" 😂
if cache_hit: | ||
# add a new broadcast subscription for the quote stream | ||
# if this feed is likely already in use | ||
async with feed.stream.subscribe() as bstream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part that requires goodboy/tractor#229 tokio
style broadcasting.
Just updated |
Probably also keeping 👀 on down the road is the cachetools project. |
Added |
uid = ctx.chan.uid | ||
fqsn = f'{symbol}.{brokername}' | ||
|
||
async for msg in stream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implements "feed pausing" - the beauty of 2 way streamzz 🏄🏼♀️
Try out he new broadcast channels from `tractor` for data feeds we already have cached. Any time there's a cache hit we load the cached feed and just slap a broadcast receiver on it for the local consumer task.
Maybe i've finally learned my lesson that exit stacks and per task ctx manager caching is just not trionic.. Use the approach we've taken for the daemon service manager as well: create a process global nursery for each unique ctx manager we wish to cache and simply tear it down when the number of consumers goes to zero. This seems to resolve all prior issues and gets us error-free cached feeds!
…ed..." Think this was fixed by passing through `**kwargs` in `maybe_open_feed()`, the shielding for fsp respawns wasn't being properly passed through.. This reverts commit 2f1455d.
piker/_cacheables.py
Outdated
ctx_key = id(mngr) | ||
|
||
# TODO: does this need to be a tractor "root nursery"? | ||
async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still slightly unclear how the teardown part of lifetime works here; pretty sure it's going to tear down when the first consumer task is complete instead of when the last arriving consumer does.
In order to get the latter behavior we might need to have an actor global nursery that's brought up with the runtime / the consumer process?
Not sure this absolutely must be addressed right now since usually the creator task stays up as long as the app / daemon which is using the feed.
@@ -335,6 +355,31 @@ async def attach_feed_bus( | |||
bus._subscribers[symbol].remove(sub) | |||
|
|||
|
|||
@asynccontextmanager | |||
async def open_sample_step_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allows us to actor-cache the OHLC step event stream per delay (since you likely will want the same event for all local consumers.
brokername, | ||
[sym], | ||
loglevel=loglevel, | ||
**kwargs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This `kwargs* pass through was critical...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love to see the new _broadcast
machinery. Also _cacheables
looking sexy!
In order to ensure the lifetime of the feed can in fact be kept open until the last consumer task has completed we need to maintain a lifetime which is hierarchically greater then all consumer tasks. This solution is somewhat hacky but seems to work well: we just use the `tractor` actor's "service nursery" (the one normally used to invoke rpc tasks) to launch the task which will start and keep open the target cached async context manager. To make this more "proper" we may want to offer a "root nursery" in all piker actors that is exposed through some singleton api or even introduce a public api for it into `tractor` directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks kosher to me. Digging the improvements!
|
||
|
||
@asynccontextmanager | ||
async def open_cached_client( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not bad right?
piker/_cacheables.py
Outdated
''' | ||
lock = trio.Lock() | ||
users: int = 0 | ||
values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woops, stack is tossed now.
0ee755d
to
4527d4a
Compare
Initial data feed caching over
piker.data.feed.open_feed()
using the newmaybe_open_feed()
.Adds a new
_cacheables.py
which contains a bunch of helpers for cache-y things.This relies on goodboy/tractor#229 in order to give multiple actor-local task consumers broadcast access to quote streams.
Putting this up to get eyes on it and see if there's any reason not to start building streaming apis under this paradigm.
Still TODO:
Feed.index_stream()
per actor, which makes me also wonder if we should provide newFeed
instances on cache hits?or can we just wrap it and overrideended up using the new._index_stream
or is that to mutate-ymaybe_cache_ctx()
mngr introduced in this PRbrokerd
-always-pushes to a tasks always pull model here?uniform_rate_send
as abroadcast_receiver()
potentially? in which case i think we can drop all the timing logic and just let sent quotes queue up and then pull on a fixed period? i may be thinking about this wrong..pause
/resume
message to the endpoint to add remove the subscription dynamicallytaking the-> Expose FSP streams asFeed
api to our fsp subsystem (this will likely get delayed to a new PR)Feed
s #216