-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persistent data feeds #158
Conversation
Supervise da daemons.
Move all feed/stream agnostic logic and shared mem writing into a new set of routines inside the ``data`` sub-package. This lets us move toward a more standard API for broker and data backends to provide cache-able persistent streams to client apps. The data layer now takes care of - starting a single background brokerd task to start a stream for as symbol if none yet exists and register that stream for later lookups - the existing broker backend actor is now always re-used if possible if it can be found in a service tree - synchronization with the brokerd stream's startup sequence is now oriented around fast startup concurrency such that client code gets a handle to historical data and quote schema as fast as possible - historical data loading is delegated to the backend more formally by starting a ``backfill_bars()`` task - write shared mem in the brokerd task and only destruct it once requested either from the parent actor or further clients - fully de-duplicate stream data by using a dynamic pub-sub strategy where new clients register for copies of the same quote set per symbol This new API is entirely working with the IB backend; others will need to be ported. That's to come shortly.
Avoid bothering with a trio event and expect the caller to do manual shm registering with the write loop. Provide OHLC sample period indexing through a re-branded pub-sub func ``iter_ohlc_periods()``.
@@ -87,6 +88,18 @@ async def open_pikerd( | |||
yield _services | |||
|
|||
|
|||
@asynccontextmanager | |||
async def maybe_open_runtime( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhat hacky and I wonder if we should offer this from tractor
itself?
The trick is being able to ping the actor registry when you're code can run as a root out of process tree.
dname, | ||
enable_modules=_data_mods + [brokermod.__name__], | ||
loglevel=loglevel, | ||
**tractor_kwargs | ||
) | ||
|
||
# TODO: so i think this is the perfect use case for supporting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link with goodboy/tractor#53, pretty sure the context entry / exit sync stuff would work amazing for this.
shm.push(last) | ||
|
||
# broadcast the buffer index step | ||
# yield {'index': shm._last.value} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why this comment is here.
|
||
|
||
@tractor.stream | ||
async def iter_ohlc_periods( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is effectively what the simplest dynamic (stream only exists while there is at least one subscriber) pub-sub system looks like.
I'm tempted to entirely drop the stuff in tractor.msg.pub
as the subscription API it self seems more confusing than this (get_topics()
, topics=List[str]
, and the like).
I think a thorough review and refactor with the questrade
backend (which has a poll-with-multiple-symbols style producer system) will probably illuminate what should be discarded.
@@ -87,16 +88,19 @@ def current_screen() -> QtGui.QScreen: | |||
assert screen, "Wow Qt is dumb as shit and has no screen..." | |||
return screen | |||
|
|||
# XXX: pretty sure none of this shit works | |||
|
|||
# XXX: pretty sure none of this shit works on linux as per: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shoehorn alert.
Lol merged into wrong branch 😂 |
This is a rework on data feed machinery to allow for persistent real-time feeds to remain running after created by some data client where that client detaches but we want to keep the feed live for later use.
For now this means spawning a background feed task which also writes to shared memory buffers and can be accessed by any (returning) data client. This has the immediately useful benefit of allowing for super fast chart loading for pre-existing feeds.
Try it out by,
pikerd
piker -b kraken XBTUSD
Both the
ib
andkraken
backends have been ported thus far.Questrade can likely come in a new PR since the single-connection-multi-symbol-subscription style can also be accomplished with
kraken
's websocket feed and we'll probably want to iron out details of how to request such things with both backends in view.Some other stuff got slapped in here, namely engaging the Qt hidpi detection on windows (which reportedly works?)