Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent data feeds #158

Merged
merged 32 commits into from
Apr 6, 2021
Merged

Persistent data feeds #158

merged 32 commits into from
Apr 6, 2021

Conversation

goodboy
Copy link
Contributor

@goodboy goodboy commented Mar 31, 2021

This is a rework on data feed machinery to allow for persistent real-time feeds to remain running after created by some data client where that client detaches but we want to keep the feed live for later use.

For now this means spawning a background feed task which also writes to shared memory buffers and can be accessed by any (returning) data client. This has the immediately useful benefit of allowing for super fast chart loading for pre-existing feeds.

Try it out by,

  • in your first terminal running pikerd
  • in your 2nd run piker -b kraken XBTUSD
  • kill you chart
  • re-run the chart command

Both the ib and kraken backends have been ported thus far.
Questrade can likely come in a new PR since the single-connection-multi-symbol-subscription style can also be accomplished with kraken's websocket feed and we'll probably want to iron out details of how to request such things with both backends in view.

Some other stuff got slapped in here, namely engaging the Qt hidpi detection on windows (which reportedly works?)

goodboy and others added 23 commits April 3, 2021 12:34
Move all feed/stream agnostic logic and shared mem writing into a new
set of routines inside the ``data`` sub-package. This lets us move
toward a more standard API for broker and data backends to provide
cache-able persistent streams to client apps.

The data layer now takes care of
- starting a single background brokerd task to start a stream for as
  symbol if none yet exists and register that stream for later lookups
- the existing broker backend actor is now always re-used if possible
  if it can be found in a service tree
- synchronization with the brokerd stream's startup sequence is now
  oriented around fast startup concurrency such that client code gets
  a handle to historical data and quote schema as fast as possible
- historical data loading is delegated to the backend more formally by
  starting a ``backfill_bars()`` task
- write shared mem in the brokerd task and only destruct it once requested
  either from the parent actor or further clients
- fully de-duplicate stream data by using a dynamic pub-sub strategy
  where new clients register for copies of the same quote set per symbol

This new API is entirely working with the IB backend; others will need
to be ported. That's to come shortly.
Avoid bothering with a trio event and expect the caller to do manual shm
registering with the write loop. Provide OHLC sample period indexing
through a re-branded pub-sub func ``iter_ohlc_periods()``.
@@ -87,6 +88,18 @@ async def open_pikerd(
yield _services


@asynccontextmanager
async def maybe_open_runtime(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat hacky and I wonder if we should offer this from tractor itself?

The trick is being able to ping the actor registry when you're code can run as a root out of process tree.

dname,
enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel,
**tractor_kwargs
)

# TODO: so i think this is the perfect use case for supporting
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link with goodboy/tractor#53, pretty sure the context entry / exit sync stuff would work amazing for this.

shm.push(last)

# broadcast the buffer index step
# yield {'index': shm._last.value}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this comment is here.



@tractor.stream
async def iter_ohlc_periods(
Copy link
Contributor Author

@goodboy goodboy Apr 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is effectively what the simplest dynamic (stream only exists while there is at least one subscriber) pub-sub system looks like.

I'm tempted to entirely drop the stuff in tractor.msg.pub as the subscription API it self seems more confusing than this (get_topics(), topics=List[str], and the like).

I think a thorough review and refactor with the questrade backend (which has a poll-with-multiple-symbols style producer system) will probably illuminate what should be discarded.

@@ -87,16 +88,19 @@ def current_screen() -> QtGui.QScreen:
assert screen, "Wow Qt is dumb as shit and has no screen..."
return screen

# XXX: pretty sure none of this shit works

# XXX: pretty sure none of this shit works on linux as per:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shoehorn alert.

@goodboy goodboy merged commit 9869b11 into supervise Apr 6, 2021
@goodboy goodboy deleted the cached_feeds branch April 6, 2021 17:10
@goodboy goodboy restored the cached_feeds branch April 6, 2021 17:11
@goodboy
Copy link
Contributor Author

goodboy commented Apr 6, 2021

Lol merged into wrong branch 😂

@goodboy goodboy mentioned this pull request Apr 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant