diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 1b5eb73647..c8f110b91c 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -26,9 +26,8 @@ from typing import ( Any, AsyncIterator, - # Callable, # Optional, - # Union, + Union, ) from bidict import bidict @@ -61,6 +60,16 @@ stream_messages, ) +MsgUnion = Union[ + BrokerdCancel, + BrokerdError, + BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, +] + async def handle_order_requests( @@ -68,7 +77,7 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, token: str, - requests: dict[str, BrokerdOrder], + emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], ) -> None: @@ -78,115 +87,103 @@ async def handle_order_requests( ''' # XXX: UGH, let's unify this.. with ``msgspec``. - request_msg: dict + msg: dict[str, Any] order: BrokerdOrder counter = count() - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) - - account = request_msg['account'] - - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send( - BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - ) - ) - continue - - action = request_msg['action'] - if action in {'buy', 'sell'}: - - # validate - msg = BrokerdOrder(**request_msg) - - # logic from old `Client.submit_limit()` - if msg.oid in ids: - ep = 'editOrder' - reqid = ids[msg.oid] # integer not txid - order = requests[msg.oid] - assert order.oid == msg.oid - extra = { - 'orderid': msg.reqid, # txid - } - - # XXX: TODO: get this working, but currently the EMS - # doesn't support changing order `.reqid` (in this case - # kraken changes them via a cancel and a new - # submission). So for now cancel and report the error. - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [msg.reqid], # should be txid from submission - }) - continue - - else: - ep = 'addOrder' - reqid = next(counter) - ids[msg.oid] = reqid - log.debug( - f"GENERATED ORDER {reqid}\n" - f'{ids}' - ) - extra = { - 'ordertype': 'limit', - 'type': msg.action, - } - - psym = msg.symbol.upper() - pair = f'{psym[:3]}/{psym[3:]}' + async for msg in ems_order_stream: + log.info(f'Rx order msg:\n{pformat(msg)}') + match msg: + case { + 'account': 'kraken.spot', + 'action': action, + } if action in {'buy', 'sell'}: + + # validate + order = BrokerdOrder(**msg) + + # logic from old `Client.submit_limit()` + if order.oid in ids: + ep = 'editOrder' + reqid = ids[order.oid] # integer not txid + last = emsflow[order.oid][-1] + assert last.reqid == order.reqid + extra = { + 'orderid': last.reqid, # txid + } + + else: + ep = 'addOrder' + reqid = next(counter) + ids[order.oid] = reqid + log.debug( + f"GENERATED ORDER {reqid}\n" + f'{ids}' + ) + extra = { + 'ordertype': 'limit', + 'type': order.action, + } + + psym = order.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' # call ws api to submit the order: # https://docs.kraken.com/websockets/#message-addOrder - await ws.send_msg({ + req = { 'event': ep, 'token': token, 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, - 'price': str(msg.price), - 'volume': str(msg.size), + 'price': str(order.price), + 'volume': str(order.size), # only ensures request is valid, nothing more # validate: 'true', - } | extra) - - elif action == 'cancel': + } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') + await ws.send_msg(req) - msg = BrokerdCancel(**request_msg) - assert msg.oid in requests - reqid = ids[msg.oid] + # placehold for sanity checking in relay loop + emsflow.setdefault(order.oid, []).append(order) - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [msg.reqid], # should be txid from submission - }) + case { + 'account': 'kraken.spot', + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + assert cancel.oid in emsflow + reqid = ids[cancel.oid] - else: - log.error(f'Unknown order command: {request_msg}') + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [cancel.reqid], # should be txid from submission + }) - # placehold for sanity checking in relay loop - requests[msg.oid] = msg + case _: + account = msg.get('account') + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + 'Invalid request msg:\n{msg}' + ), + + ).dict() + ) @acm @@ -311,7 +308,11 @@ async def trades_dialogue( ) as ws, trio.open_nursery() as n, ): - reqmsgs: dict[str, BrokerdOrder] = {} + # task local msg dialog tracking + emsflow: dict[ + str, + list[MsgUnion], + ] = {} # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -323,7 +324,7 @@ async def trades_dialogue( client, ems_stream, token, - reqmsgs, + emsflow, ids, ) @@ -447,15 +448,24 @@ async def trades_dialogue( # above: # https://github.com/pikers/piker/issues/293 # https://github.com/pikers/piker/issues/310 - log.info(f'Orders update {seq}:{order_msgs}') - for order_msg in order_msgs: log.info( - 'Order msg update:\n' + 'Order msg update_{seq}:\n' f'{pformat(order_msg)}' ) txid, update_msg = list(order_msg.items())[0] match update_msg: + case { + 'cancel_reason': 'Order replaced', + 'status': status, + 'userref': reqid, + **rest, + }: + # we ignore internal order updates + # triggered by kraken's "edit" + # endpoint. + continue + case { 'status': status, 'userref': reqid, @@ -490,15 +500,20 @@ async def trades_dialogue( 'open': 'submitted', 'closed': 'cancelled', 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? 'pending': 'pending', }[status] submit_vlm = rest.get('vol', 0) exec_vlm = rest.get('vol_exec', 0) + oid = ids.inverse[reqid] + msgs = emsflow[oid] + # send BrokerdStatus messages for all # order state updates - msg = BrokerdStatus( + resp = BrokerdStatus( reqid=txid, time_ns=time.time_ns(), # cuz why not @@ -519,7 +534,8 @@ async def trades_dialogue( {'name': 'kraken'}, **update_msg ), ) - await ems_stream.send(msg.dict()) + msgs.append(resp) + await ems_stream.send(resp.dict()) case _: log.warning( @@ -537,28 +553,29 @@ async def trades_dialogue( and status == 'error' ): log.error( - f'Failed to submit order {reqid}:\n' + f'Failed to submit/edit order {reqid}:\n' f'{errmsg}' ) oid = ids.inverse[reqid] - order = reqmsgs[oid] - await ems_stream.send( - BrokerdError( - oid=oid, - # use old reqid in case it changed? - reqid=order.reqid, - symbol=order.symbol, - reason=f'Failed submit:\n{errmsg}', - broker_details=resp - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdError( + oid=oid, + # use old reqid in case it changed? + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed submit:\n{errmsg}', + broker_details=resp ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # if we rx any error cancel the order again await ws.send_msg({ 'event': 'cancelOrder', 'token': token, 'reqid': reqid, - 'txid': [order.reqid], # txid from submission + 'txid': [last.reqid], # txid from submission }) case { @@ -575,49 +592,48 @@ async def trades_dialogue( # **rest, }: oid = ids.inverse[reqid] - order = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] log.info( - f'Submitting order {oid}[{reqid}]:\n' + f'Submitting order: {descr}\n' + f'ems oid: {oid}\n' + f're-mapped reqid: {reqid}\n' f'txid: {txid}\n' - f'{descr}' ) - - # deliver ack immediately - await ems_stream.send( - BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=order.account, # piker account - ).dict() + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account ) + msgs.append(resp) + await ems_stream.send(resp.dict()) case { 'event': 'editOrderStatus', 'status': status, - 'errorMessage': errmsg, 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value 'txid': txid, 'originaltxid': origtxid, - # **rest, }: log.info( - f'Editting order {oid}[{reqid}]:\n' + f'Editting order {oid}[requid={reqid}]:\n' f'txid: {origtxid} -> {txid}\n' f'{descr}' ) - # deliver another ack to update the ems-side - # `.reqid`. + # deliver another ack to update the ems-side `.reqid`. oid = ids.inverse[reqid] - await ems_stream.send( - BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=order.account, # piker account - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # successful cancellation case { @@ -629,19 +645,20 @@ async def trades_dialogue( # TODO: should we support "batch" acking of # multiple cancels thus avoiding the below loop? oid = ids.inverse[reqid] - msg = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] for txid in txids: - await ems_stream.send( - BrokerdStatus( - reqid=txid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Cancel success: {oid}@{txid}', - broker_details=resp, - ).dict() + resp = BrokerdStatus( + reqid=txid, + account=last.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=resp, ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # failed cancel case { @@ -651,17 +668,18 @@ async def trades_dialogue( 'reqid': reqid, }: oid = ids.inverse[reqid] - msg = reqmsgs[oid] - - await ems_stream.send( - BrokerdError( - oid=oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason=f'Failed order cancel {errmsg}', - broker_details=resp - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + + resp = BrokerdError( + oid=oid, + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed order cancel {errmsg}', + broker_details=resp ) + msgs.append(resp) + await ems_stream.send(resp.dict()) case _: log.warning(f'Unhandled trades msg: {msg}')