diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 967821412..1bb57ae79 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -395,9 +395,10 @@ def mk_paper_ep(): if trades_endpoint: break else: - raise RuntimeError( + log.warning( f'No live trading EP found: {brokermod.name}?' ) + exec_mode: str = 'paper' if ( trades_endpoint is not None @@ -408,9 +409,6 @@ def mk_paper_ep(): trades_endpoint, ) - else: - exec_mode: str = 'paper' - @acm async def maybe_open_paper_ep(): if exec_mode == 'paper': @@ -523,6 +521,7 @@ class Router(Struct): ] = defaultdict(set) # TODO: mapping of ems dialog ids to msg flow history + # - use the new ._util.OrderDialogs? # msgflows: defaultdict[ # str, # ChainMap[dict[str, dict]], @@ -641,6 +640,9 @@ async def open_trade_relays( loglevel=loglevel, ) as feed, ): + # extract expanded fqme in case input was of a less + # qualified form, eg. xbteur.kraken -> xbteur.spot.kraken + fqme: str = list(feed.flumes.keys())[0] brokername, _, _, _ = unpack_fqme(fqme) brokermod = feed.mods[brokername] broker = brokermod.name @@ -675,7 +677,7 @@ async def open_trade_relays( client_ready = trio.Event() task_status.started( - (relay, feed, client_ready) + (fqme, relay, feed, client_ready) ) # sync to the client side by waiting for the stream @@ -1468,13 +1470,13 @@ async def cached_mngr( loglevel: str = 'info', ): - relay, feed, client_ready = await _router.nursery.start( + fqme, relay, feed, client_ready = await _router.nursery.start( _router.open_trade_relays, fqme, exec_mode, loglevel, ) - yield relay, feed, client_ready + yield fqme, relay, feed, client_ready async with tractor.trionics.maybe_open_context( acm_func=cached_mngr, @@ -1487,13 +1489,13 @@ async def cached_mngr( key=cache_on_fqme_unless_paper, ) as ( cache_hit, - (relay, feed, client_ready) + (fqme, relay, feed, client_ready) ): if cache_hit: log.info(f'Reusing existing trades relay for {fqme}:\n' f'{relay}\n') - yield relay, feed, client_ready + yield fqme, relay, feed, client_ready @tractor.context @@ -1576,7 +1578,7 @@ async def _emsd_main( fqme, exec_mode, loglevel, - ) as (relay, feed, client_ready): + ) as (fqme, relay, feed, client_ready): brokerd_stream = relay.brokerd_stream dark_book = _router.get_dark_book(broker) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 4220bf63e..34e7ec58e 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -574,7 +574,7 @@ async def open_trade_dialog( if fqme: bs_fqme, _, broker = fqme.rpartition('.') mkt, _ = await brokermod.get_mkt_info(bs_fqme) - mkt_by_fqme[fqme] = mkt + mkt_by_fqme[mkt.fqme] = mkt # for each sym in the ledger load it's `MktPair` info for tid, txdict in ledger.data.items(): diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index 9015ba69b..9eebf1c4d 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -91,6 +91,3 @@ def pop( ''' return self._flows.pop(oid) - - -