Skip to content

Commit

Permalink
Always use fully expanded FQME throughout .clearing
Browse files Browse the repository at this point in the history
Since crypto backends now also may expand an FQME like `xbteur.kraken`
-> `xbteur.spot.kraken` (by filling in the venue token), we need to use
this identifier when looking up per-market order dialogs or submitting
new requests. The simple fix is to simply look up that expanded from
from the `Feed.flumes` table which is always keyed by the `MktPair.fqme:
str` - the expanded form.
  • Loading branch information
goodboy committed Jun 20, 2023
1 parent caae84f commit d06cf7d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
22 changes: 12 additions & 10 deletions piker/clearing/_ems.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,10 @@ def mk_paper_ep():
if trades_endpoint:
break
else:
raise RuntimeError(
log.warning(
f'No live trading EP found: {brokermod.name}?'
)
exec_mode: str = 'paper'

if (
trades_endpoint is not None
Expand All @@ -408,9 +409,6 @@ def mk_paper_ep():
trades_endpoint,
)

else:
exec_mode: str = 'paper'

@acm
async def maybe_open_paper_ep():
if exec_mode == 'paper':
Expand Down Expand Up @@ -523,6 +521,7 @@ class Router(Struct):
] = defaultdict(set)

# TODO: mapping of ems dialog ids to msg flow history
# - use the new ._util.OrderDialogs?
# msgflows: defaultdict[
# str,
# ChainMap[dict[str, dict]],
Expand Down Expand Up @@ -641,6 +640,9 @@ async def open_trade_relays(
loglevel=loglevel,
) as feed,
):
# extract expanded fqme in case input was of a less
# qualified form, eg. xbteur.kraken -> xbteur.spot.kraken
fqme: str = list(feed.flumes.keys())[0]
brokername, _, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername]
broker = brokermod.name
Expand Down Expand Up @@ -675,7 +677,7 @@ async def open_trade_relays(

client_ready = trio.Event()
task_status.started(
(relay, feed, client_ready)
(fqme, relay, feed, client_ready)
)

# sync to the client side by waiting for the stream
Expand Down Expand Up @@ -1468,13 +1470,13 @@ async def cached_mngr(
loglevel: str = 'info',
):

relay, feed, client_ready = await _router.nursery.start(
fqme, relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays,
fqme,
exec_mode,
loglevel,
)
yield relay, feed, client_ready
yield fqme, relay, feed, client_ready

async with tractor.trionics.maybe_open_context(
acm_func=cached_mngr,
Expand All @@ -1487,13 +1489,13 @@ async def cached_mngr(
key=cache_on_fqme_unless_paper,
) as (
cache_hit,
(relay, feed, client_ready)
(fqme, relay, feed, client_ready)
):
if cache_hit:
log.info(f'Reusing existing trades relay for {fqme}:\n'
f'{relay}\n')

yield relay, feed, client_ready
yield fqme, relay, feed, client_ready


@tractor.context
Expand Down Expand Up @@ -1576,7 +1578,7 @@ async def _emsd_main(
fqme,
exec_mode,
loglevel,
) as (relay, feed, client_ready):
) as (fqme, relay, feed, client_ready):

brokerd_stream = relay.brokerd_stream
dark_book = _router.get_dark_book(broker)
Expand Down
2 changes: 1 addition & 1 deletion piker/clearing/_paper_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ async def open_trade_dialog(
if fqme:
bs_fqme, _, broker = fqme.rpartition('.')
mkt, _ = await brokermod.get_mkt_info(bs_fqme)
mkt_by_fqme[fqme] = mkt
mkt_by_fqme[mkt.fqme] = mkt

# for each sym in the ledger load it's `MktPair` info
for tid, txdict in ledger.data.items():
Expand Down
3 changes: 0 additions & 3 deletions piker/clearing/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,3 @@ def pop(
'''
return self._flows.pop(oid)



0 comments on commit d06cf7d

Please sign in to comment.