From 74f136cef7b59e9ac558f86a97ed6af3202a0899 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 11:03:32 -0400 Subject: [PATCH] Get order "editing" working fully Turns out the EMS can support this as originally expected: you can update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems which update its cross-dialog (leg) tracking correctly! The issue was a bug in the `editOrderStatus` msg handling and appropriate tracking of the correct `.oid` (ems uid) on the kraken side. This unfortunately required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow tracing table which means the broker daemon is tracking all the msg flow with the ems, though I'm wondering now if this is just good practise anyway and maybe we should offer a small primitive type from our msging utils to aid with this? I've used such constructs in event handling systems prior. There's a lot more factoring that can be done after these changes as well but the quick detailed summary is, - rework the `handle_order_requests()` loop to use `match:` syntax and update the new `emsflow` table on every new request from the ems. - fix the `editOrderStatus` case pattern to not include an error msg and thus actually be triggered to respond to the ems with a `BrokerdAck` containing the new `.reqid`, the new kraken side `txid`. - skip any `openOrders` msgs which are detected as being kraken's internal order "edits" by matching on the `cancel_reason` field. - update the `emsflow` table in all ws-stream msg handling blocks with responses sent to the ems. Relates to #290 --- piker/brokers/kraken/broker.py | 326 +++++++++++++++++---------------- 1 file changed, 172 insertions(+), 154 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 1b5eb73647..c8f110b91c 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -26,9 +26,8 @@ from typing import ( Any, AsyncIterator, - # Callable, # Optional, - # Union, + Union, ) from bidict import bidict @@ -61,6 +60,16 @@ stream_messages, ) +MsgUnion = Union[ + BrokerdCancel, + BrokerdError, + BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, +] + async def handle_order_requests( @@ -68,7 +77,7 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, token: str, - requests: dict[str, BrokerdOrder], + emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], ) -> None: @@ -78,115 +87,103 @@ async def handle_order_requests( ''' # XXX: UGH, let's unify this.. with ``msgspec``. - request_msg: dict + msg: dict[str, Any] order: BrokerdOrder counter = count() - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) - - account = request_msg['account'] - - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send( - BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - ) - ) - continue - - action = request_msg['action'] - if action in {'buy', 'sell'}: - - # validate - msg = BrokerdOrder(**request_msg) - - # logic from old `Client.submit_limit()` - if msg.oid in ids: - ep = 'editOrder' - reqid = ids[msg.oid] # integer not txid - order = requests[msg.oid] - assert order.oid == msg.oid - extra = { - 'orderid': msg.reqid, # txid - } - - # XXX: TODO: get this working, but currently the EMS - # doesn't support changing order `.reqid` (in this case - # kraken changes them via a cancel and a new - # submission). So for now cancel and report the error. - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [msg.reqid], # should be txid from submission - }) - continue - - else: - ep = 'addOrder' - reqid = next(counter) - ids[msg.oid] = reqid - log.debug( - f"GENERATED ORDER {reqid}\n" - f'{ids}' - ) - extra = { - 'ordertype': 'limit', - 'type': msg.action, - } - - psym = msg.symbol.upper() - pair = f'{psym[:3]}/{psym[3:]}' + async for msg in ems_order_stream: + log.info(f'Rx order msg:\n{pformat(msg)}') + match msg: + case { + 'account': 'kraken.spot', + 'action': action, + } if action in {'buy', 'sell'}: + + # validate + order = BrokerdOrder(**msg) + + # logic from old `Client.submit_limit()` + if order.oid in ids: + ep = 'editOrder' + reqid = ids[order.oid] # integer not txid + last = emsflow[order.oid][-1] + assert last.reqid == order.reqid + extra = { + 'orderid': last.reqid, # txid + } + + else: + ep = 'addOrder' + reqid = next(counter) + ids[order.oid] = reqid + log.debug( + f"GENERATED ORDER {reqid}\n" + f'{ids}' + ) + extra = { + 'ordertype': 'limit', + 'type': order.action, + } + + psym = order.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' # call ws api to submit the order: # https://docs.kraken.com/websockets/#message-addOrder - await ws.send_msg({ + req = { 'event': ep, 'token': token, 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, - 'price': str(msg.price), - 'volume': str(msg.size), + 'price': str(order.price), + 'volume': str(order.size), # only ensures request is valid, nothing more # validate: 'true', - } | extra) - - elif action == 'cancel': + } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') + await ws.send_msg(req) - msg = BrokerdCancel(**request_msg) - assert msg.oid in requests - reqid = ids[msg.oid] + # placehold for sanity checking in relay loop + emsflow.setdefault(order.oid, []).append(order) - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [msg.reqid], # should be txid from submission - }) + case { + 'account': 'kraken.spot', + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + assert cancel.oid in emsflow + reqid = ids[cancel.oid] - else: - log.error(f'Unknown order command: {request_msg}') + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [cancel.reqid], # should be txid from submission + }) - # placehold for sanity checking in relay loop - requests[msg.oid] = msg + case _: + account = msg.get('account') + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + 'Invalid request msg:\n{msg}' + ), + + ).dict() + ) @acm @@ -311,7 +308,11 @@ async def trades_dialogue( ) as ws, trio.open_nursery() as n, ): - reqmsgs: dict[str, BrokerdOrder] = {} + # task local msg dialog tracking + emsflow: dict[ + str, + list[MsgUnion], + ] = {} # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -323,7 +324,7 @@ async def trades_dialogue( client, ems_stream, token, - reqmsgs, + emsflow, ids, ) @@ -447,15 +448,24 @@ async def trades_dialogue( # above: # https://github.com/pikers/piker/issues/293 # https://github.com/pikers/piker/issues/310 - log.info(f'Orders update {seq}:{order_msgs}') - for order_msg in order_msgs: log.info( - 'Order msg update:\n' + 'Order msg update_{seq}:\n' f'{pformat(order_msg)}' ) txid, update_msg = list(order_msg.items())[0] match update_msg: + case { + 'cancel_reason': 'Order replaced', + 'status': status, + 'userref': reqid, + **rest, + }: + # we ignore internal order updates + # triggered by kraken's "edit" + # endpoint. + continue + case { 'status': status, 'userref': reqid, @@ -490,15 +500,20 @@ async def trades_dialogue( 'open': 'submitted', 'closed': 'cancelled', 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? 'pending': 'pending', }[status] submit_vlm = rest.get('vol', 0) exec_vlm = rest.get('vol_exec', 0) + oid = ids.inverse[reqid] + msgs = emsflow[oid] + # send BrokerdStatus messages for all # order state updates - msg = BrokerdStatus( + resp = BrokerdStatus( reqid=txid, time_ns=time.time_ns(), # cuz why not @@ -519,7 +534,8 @@ async def trades_dialogue( {'name': 'kraken'}, **update_msg ), ) - await ems_stream.send(msg.dict()) + msgs.append(resp) + await ems_stream.send(resp.dict()) case _: log.warning( @@ -537,28 +553,29 @@ async def trades_dialogue( and status == 'error' ): log.error( - f'Failed to submit order {reqid}:\n' + f'Failed to submit/edit order {reqid}:\n' f'{errmsg}' ) oid = ids.inverse[reqid] - order = reqmsgs[oid] - await ems_stream.send( - BrokerdError( - oid=oid, - # use old reqid in case it changed? - reqid=order.reqid, - symbol=order.symbol, - reason=f'Failed submit:\n{errmsg}', - broker_details=resp - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdError( + oid=oid, + # use old reqid in case it changed? + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed submit:\n{errmsg}', + broker_details=resp ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # if we rx any error cancel the order again await ws.send_msg({ 'event': 'cancelOrder', 'token': token, 'reqid': reqid, - 'txid': [order.reqid], # txid from submission + 'txid': [last.reqid], # txid from submission }) case { @@ -575,49 +592,48 @@ async def trades_dialogue( # **rest, }: oid = ids.inverse[reqid] - order = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] log.info( - f'Submitting order {oid}[{reqid}]:\n' + f'Submitting order: {descr}\n' + f'ems oid: {oid}\n' + f're-mapped reqid: {reqid}\n' f'txid: {txid}\n' - f'{descr}' ) - - # deliver ack immediately - await ems_stream.send( - BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=order.account, # piker account - ).dict() + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account ) + msgs.append(resp) + await ems_stream.send(resp.dict()) case { 'event': 'editOrderStatus', 'status': status, - 'errorMessage': errmsg, 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value 'txid': txid, 'originaltxid': origtxid, - # **rest, }: log.info( - f'Editting order {oid}[{reqid}]:\n' + f'Editting order {oid}[requid={reqid}]:\n' f'txid: {origtxid} -> {txid}\n' f'{descr}' ) - # deliver another ack to update the ems-side - # `.reqid`. + # deliver another ack to update the ems-side `.reqid`. oid = ids.inverse[reqid] - await ems_stream.send( - BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=order.account, # piker account - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # successful cancellation case { @@ -629,19 +645,20 @@ async def trades_dialogue( # TODO: should we support "batch" acking of # multiple cancels thus avoiding the below loop? oid = ids.inverse[reqid] - msg = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] for txid in txids: - await ems_stream.send( - BrokerdStatus( - reqid=txid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Cancel success: {oid}@{txid}', - broker_details=resp, - ).dict() + resp = BrokerdStatus( + reqid=txid, + account=last.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=resp, ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # failed cancel case { @@ -651,17 +668,18 @@ async def trades_dialogue( 'reqid': reqid, }: oid = ids.inverse[reqid] - msg = reqmsgs[oid] - - await ems_stream.send( - BrokerdError( - oid=oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason=f'Failed order cancel {errmsg}', - broker_details=resp - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + + resp = BrokerdError( + oid=oid, + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed order cancel {errmsg}', + broker_details=resp ) + msgs.append(resp) + await ems_stream.send(resp.dict()) case _: log.warning(f'Unhandled trades msg: {msg}')