Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feed caching #212

Merged
merged 28 commits into from
Sep 1, 2021
Merged

Feed caching #212

merged 28 commits into from
Sep 1, 2021

Conversation

goodboy
Copy link
Contributor

@goodboy goodboy commented Aug 10, 2021

Initial data feed caching over piker.data.feed.open_feed() using the new maybe_open_feed().

Adds a new _cacheables.py which contains a bunch of helpers for cache-y things.
This relies on goodboy/tractor#229 in order to give multiple actor-local task consumers broadcast access to quote streams.

Putting this up to get eyes on it and see if there's any reason not to start building streaming apis under this paradigm.

Still TODO:

  • we should also cache the Feed.index_stream() per actor, which makes me also wonder if we should provide new Feed instances on cache hits? or can we just wrap it and override ._index_stream or is that to mutate-y ended up using the new maybe_cache_ctx() mngr introduced in this PR
  • attaching to a data feed currently registers the client stream with the sample and broadcast loop; would there be a benefit of switching from this brokerd-always-pushes to a tasks always pull model here?
    • i'm thinking no
    • the only thing that might be handy is a re-impl of uniform_rate_send as a broadcast_receiver() potentially? in which case i think we can drop all the timing logic and just let sent quotes queue up and then pull on a fixed period? i may be thinking about this wrong..
  • feed "pausing" support which allows sending a pause/resume message to the endpoint to add remove the subscription dynamically
  • taking the Feed api to our fsp subsystem (this will likely get delayed to a new PR) -> Expose FSP streams as Feeds #216

@goodboy goodboy added data-layer real-time and historical data processing and storage (sub-)systems general sw design and eng perf efficiency and latency optimization labels Aug 10, 2021
@goodboy goodboy requested a review from guilledk August 10, 2021 21:09
# maybe_open_ctx() below except it uses an async exit statck.
# ideally wer pick one or the other.
@asynccontextmanager
async def open_cached_client(
Copy link
Contributor Author

@goodboy goodboy Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is old code that i think should be ported to the maybe_open_ctx() below but for broker backend client instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boo yeah. worked like a charm 🏄🏼

def get_and_use() -> AsyncIterable[T]:
# key error must bubble here
feed = cache.ctxs[key]
log.info(f'Reusing cached feed for {key}')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this should be some representation format instead of "feed" 😂

if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part that requires goodboy/tractor#229 tokio style broadcasting.

@goodboy
Copy link
Contributor Author

goodboy commented Aug 10, 2021

Just updated infect_asyncio in tractor to match. so we shud be guuuudd.

@goodboy
Copy link
Contributor Author

goodboy commented Aug 11, 2021

Probably also keeping 👀 on down the road is the cachetools project.

piker/_cacheables.py Outdated Show resolved Hide resolved
@goodboy
Copy link
Contributor Author

goodboy commented Aug 16, 2021

Added Feed.pause()/.resume() so we can use it when charts are switched to avoid brokerd pushing more streams then necessary and also unseen draw cycles on charts not in focus 😎

uid = ctx.chan.uid
fqsn = f'{symbol}.{brokername}'

async for msg in stream:
Copy link
Contributor Author

@goodboy goodboy Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implements "feed pausing" - the beauty of 2 way streamzz 🏄🏼‍♀️

Maybe i've finally learned my lesson that exit stacks and per task ctx
manager caching is just not trionic.. Use the approach we've taken for
the daemon service manager as well: create a process global nursery for
each unique ctx manager we wish to cache and simply tear it down when
the number of consumers goes to zero.

This seems to resolve all prior issues and gets us error-free cached
feeds!
…ed..."

Think this was fixed by passing through `**kwargs` in
`maybe_open_feed()`, the shielding for fsp respawns wasn't being
properly passed through..

This reverts commit 2f1455d.
@goodboy goodboy requested a review from iamzoltan August 31, 2021 13:17
ctx_key = id(mngr)

# TODO: does this need to be a tractor "root nursery"?
async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still slightly unclear how the teardown part of lifetime works here; pretty sure it's going to tear down when the first consumer task is complete instead of when the last arriving consumer does.

In order to get the latter behavior we might need to have an actor global nursery that's brought up with the runtime / the consumer process?

Not sure this absolutely must be addressed right now since usually the creator task stays up as long as the app / daemon which is using the feed.

@@ -335,6 +355,31 @@ async def attach_feed_bus(
bus._subscribers[symbol].remove(sub)


@asynccontextmanager
async def open_sample_step_stream(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allows us to actor-cache the OHLC step event stream per delay (since you likely will want the same event for all local consumers.

brokername,
[sym],
loglevel=loglevel,
**kwargs,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This `kwargs* pass through was critical...

Copy link
Contributor

@guilledk guilledk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love to see the new _broadcast machinery. Also _cacheables looking sexy!

In order to ensure the lifetime of the feed can in fact be kept open
until the last consumer task has completed we need to maintain
a lifetime which is hierarchically greater then all consumer tasks.

This solution is somewhat hacky but seems to work well: we just use the
`tractor` actor's "service nursery" (the one normally used to invoke rpc
tasks) to launch the task which will start and keep open the target
cached async context manager. To make this more "proper" we may want to
offer a "root nursery" in all piker actors that is exposed through some
singleton api or even introduce a public api for it into `tractor`
directly.
Copy link
Contributor

@iamzoltan iamzoltan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks kosher to me. Digging the improvements!



@asynccontextmanager
async def open_cached_client(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not bad right?

'''
lock = trio.Lock()
users: int = 0
values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops, stack is tossed now.

@goodboy goodboy merged commit 37d94fb into master Sep 1, 2021
@goodboy goodboy deleted the feed_caching branch September 1, 2021 14:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-layer real-time and historical data processing and storage perf efficiency and latency optimization (sub-)systems general sw design and eng
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants