Skip to content

Commit

Permalink
Adjust to per-fqsn-oriented Flume lookups throughout
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Nov 10, 2022
1 parent 41f47d2 commit 54d7d0d
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 54 deletions.
23 changes: 14 additions & 9 deletions piker/clearing/_ems.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from ..data.feed import (
Feed,
Flume,
maybe_open_feed,
)
from ..ui._notify import notify_from_ems_status_msg
Expand Down Expand Up @@ -523,13 +524,14 @@ async def open_trade_relays(
maybe_open_feed(
[fqsn],
loglevel=loglevel,
) as (feed, quote_stream),
) as feed,
):
brokermod = feed.mod
broker = brokermod.name

# XXX: this should be initial price quote from target provider
first_quote: dict = feed.first_quotes[fqsn]
flume = feed.flumes[fqsn]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']

Expand All @@ -547,14 +549,16 @@ async def open_trade_relays(
clear_dark_triggers,
self,
relay.brokerd_stream,
quote_stream,
flume.stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)

client_ready = trio.Event()
task_status.started((relay, feed, client_ready))
task_status.started(
(relay, feed, client_ready)
)

# sync to the client side by waiting for the stream
# connection setup before relaying any existing live
Expand Down Expand Up @@ -1014,7 +1018,7 @@ async def process_client_order_cmds(
brokerd_order_stream: tractor.MsgStream,

fqsn: str,
feed: Feed,
flume: Flume,
dark_book: DarkBook,
router: Router,

Expand Down Expand Up @@ -1212,7 +1216,7 @@ async def process_client_order_cmds(
'size': size,
'exec_mode': exec_mode,
'action': action,
'brokers': brokers, # list
'brokers': _, # list
} if (
# "DARK" triggers
# submit order to local EMS book and scan loop,
Expand All @@ -1234,12 +1238,12 @@ async def process_client_order_cmds(
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.rt_shm.array[-1]['close']
last = flume.rt_shm.array[-1]['close']

pred = mk_check(trigger_price, last, action)

spread_slap: float = 5
min_tick = feed.symbols[fqsn].tick_size
min_tick = flume.symbol.tick_size

if action == 'buy':
tickfilter = ('ask', 'last', 'trade')
Expand Down Expand Up @@ -1452,11 +1456,12 @@ async def _emsd_main(
# start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled.
try:
flume = feed.flumes[fqsn]
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
feed,
flume,
dark_book,
_router,
)
Expand Down
2 changes: 1 addition & 1 deletion piker/clearing/_paper_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ async def trades_dialogue(
)

# paper engine simulator clearing task
await simulate_fills(feed.stream, client)
await simulate_fills(feed.streams[broker], client)


@asynccontextmanager
Expand Down
23 changes: 12 additions & 11 deletions piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
)

import numpy as np
import pyqtgraph as pg
import trio
from trio_typing import TaskStatus
import tractor
Expand All @@ -35,7 +34,9 @@
from ..log import get_logger, get_console_log
from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data.feed import (
Flume,
)
from ..data._sharedmem import ShmArray
from ..data._sampling import _default_delay_s
from ..data._source import Symbol
Expand Down Expand Up @@ -79,7 +80,7 @@ async def filter_quotes_by_sym(
async def fsp_compute(

symbol: Symbol,
feed: Feed,
flume: Flume,
quote_stream: trio.abc.ReceiveChannel,

src: ShmArray,
Expand Down Expand Up @@ -107,7 +108,7 @@ async def fsp_compute(
filter_quotes_by_sym(fqsn, quote_stream),

# XXX: currently the ``ohlcv`` arg
feed.rt_shm,
flume.rt_shm,
)

# Conduct a single iteration of fsp with historical bars input
Expand Down Expand Up @@ -310,12 +311,12 @@ async def cascade(
# needs to get throttled the ticks we generate.
# tick_throttle=60,

) as (feed, quote_stream):
symbol = feed.symbols[fqsn]
) as feed:

flume = feed.flumes[fqsn]
symbol = flume.symbol
assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up')

assert src.token == feed.rt_shm.token
# last_len = new_len = len(src.array)

func_name = func.__name__
Expand All @@ -327,8 +328,8 @@ async def cascade(

fsp_compute,
symbol=symbol,
feed=feed,
quote_stream=quote_stream,
flume=flume,
quote_stream=flume.stream,

# shm
src=src,
Expand Down Expand Up @@ -430,7 +431,7 @@ async def poll_and_sync_to_step(

# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream(
async with flume.index_stream(
int(delay_s)
) as istream:

Expand Down
6 changes: 4 additions & 2 deletions piker/ui/_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,14 +915,16 @@ def __init__(
def resume_all_feeds(self):
try:
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
for flume in feed.flumes.values():
self.linked.godwidget._root_n.start_soon(flume.resume)
except RuntimeError:
# TODO: cancel the qtractor runtime here?
raise

def pause_all_feeds(self):
for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.pause)
for flume in feed.flumes.values():
self.linked.godwidget._root_n.start_soon(flume.pause)

@property
def view(self) -> ChartView:
Expand Down
44 changes: 25 additions & 19 deletions piker/ui/_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from ..data.feed import (
open_feed,
Feed,
Flume,
)
from ..data.types import Struct
from ._axes import YAxisLabel
Expand Down Expand Up @@ -228,7 +229,7 @@ async def graphics_update_loop(

nurse: trio.Nursery,
godwidget: GodWidget,
feed: Feed,
flume: Flume,
wap_in_history: bool = False,
vlm_chart: Optional[ChartPlotWidget] = None,

Expand All @@ -255,8 +256,8 @@ async def graphics_update_loop(
fast_chart = linked.chart
hist_chart = godwidget.hist_linked.chart

ohlcv = feed.rt_shm
hist_ohlcv = feed.hist_shm
ohlcv = flume.rt_shm
hist_ohlcv = flume.hist_shm

# update last price sticky
last_price_sticky = fast_chart._ysticks[fast_chart.name]
Expand Down Expand Up @@ -347,9 +348,9 @@ async def increment_history_view():
'i_last_append': i_last,
'i_last': i_last,
}
_, hist_step_size_s, _ = feed.get_ds_info()
_, hist_step_size_s, _ = flume.get_ds_info()

async with feed.index_stream(
async with flume.index_stream(
# int(hist_step_size_s)
# TODO: seems this is more reliable at keeping the slow
# chart incremented in view more correctly?
Expand Down Expand Up @@ -393,7 +394,7 @@ async def increment_history_view():
nurse.start_soon(increment_history_view)

# main real-time quotes update loop
stream: tractor.MsgStream = feed.stream
stream: tractor.MsgStream = flume.stream
async for quotes in stream:

ds.quotes = quotes
Expand Down Expand Up @@ -813,13 +814,13 @@ def graphics_update_cycle(
async def link_views_with_region(
rt_chart: ChartPlotWidget,
hist_chart: ChartPlotWidget,
feed: Feed,
flume: Flume,

) -> None:

# these value are be only pulled once during shm init/startup
izero_hist = feed.izero_hist
izero_rt = feed.izero_rt
izero_hist = flume.izero_hist
izero_rt = flume.izero_rt

# Add the LinearRegionItem to the ViewBox, but tell the ViewBox
# to exclude this item when doing auto-range calculations.
Expand All @@ -846,7 +847,7 @@ async def link_views_with_region(
# poll for datums load and timestep detection
for _ in range(100):
try:
_, _, ratio = feed.get_ds_info()
_, _, ratio = flume.get_ds_info()
break
except IndexError:
await trio.sleep(0.01)
Expand Down Expand Up @@ -977,8 +978,7 @@ async def display_symbol_data(
group_key=True
)

first_fqsn = fqsns[0]

feed: Feed
async with open_feed(
fqsns,
loglevel=loglevel,
Expand All @@ -988,11 +988,17 @@ async def display_symbol_data(
tick_throttle=_quote_throttle_rate,

) as feed:
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm

symbol = feed.symbols[first_fqsn]
fqsn = symbol.front_fqsn()
# TODO: right now we only show one symbol on charts, but
# overlays are coming muy pronto guey..
assert len(feed.flumes) == 1
flume = list(feed.flumes.values())[0]

ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm

symbol = flume.symbol
fqsn = symbol.fqsn

step_size_s = 1
tf_key = tf_in_1s[step_size_s]
Expand All @@ -1012,7 +1018,7 @@ async def display_symbol_data(
hist_linked._symbol = symbol
hist_chart = hist_linked.plot_ohlc_main(
symbol,
feed.hist_shm,
hist_ohlcv,
# in the case of history chart we explicitly set `False`
# to avoid internal pane creation.
# sidepane=False,
Expand Down Expand Up @@ -1100,7 +1106,7 @@ async def display_symbol_data(
graphics_update_loop,
ln,
godwidget,
feed,
flume,
wap_in_history,
vlm_chart,
)
Expand All @@ -1124,7 +1130,7 @@ async def display_symbol_data(
await link_views_with_region(
ohlc_chart,
hist_chart,
feed,
flume,
)

mode: OrderMode
Expand Down
15 changes: 9 additions & 6 deletions piker/ui/_position.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
from ..clearing._allocate import Allocator
from ..pp import Position
from ..data._normalize import iterticks
from ..data.feed import Feed
from ..data.feed import (
Feed,
Flume,
)
from ..data.types import Struct
from ._label import Label
from ._lines import LevelLine, order_line
Expand All @@ -64,7 +67,7 @@

async def update_pnl_from_feed(

feed: Feed,
flume: Flume,
order_mode: OrderMode, # noqa
tracker: PositionTracker,

Expand Down Expand Up @@ -95,7 +98,7 @@ async def update_pnl_from_feed(

# real-time update pnl on the status pane
try:
async with feed.stream.subscribe() as bstream:
async with flume.stream.subscribe() as bstream:
# last_tick = time.time()
async for quotes in bstream:

Expand Down Expand Up @@ -390,12 +393,12 @@ def display_pnl(
mode = self.order_mode
sym = mode.chart.linked.symbol
size = tracker.live_pp.size
feed = mode.quote_feed
flume: Feed = mode.feed.flumes[sym.fqsn]
pnl_value = 0

if size:
# last historical close price
last = feed.rt_shm.array[-1][['close']][0]
last = flume.rt_shm.array[-1][['close']][0]
pnl_value = copysign(1, size) * pnl(
tracker.live_pp.ppu,
last,
Expand All @@ -408,7 +411,7 @@ def display_pnl(
_pnl_tasks[fqsn] = True
self.order_mode.nursery.start_soon(
update_pnl_from_feed,
feed,
flume,
mode,
tracker,
)
Expand Down
Loading

0 comments on commit 54d7d0d

Please sign in to comment.