From e8aec5c6e0f6b102d4268f6f9600fdb7d9893001 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Jul 2022 22:00:56 -0400 Subject: [PATCH] First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296 --- piker/brokers/kraken/api.py | 1 + piker/brokers/kraken/broker.py | 457 +++++++++++++++++++++++---------- 2 files changed, 319 insertions(+), 139 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 3abf533e0b..17b79027c7 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -282,6 +282,7 @@ async def submit_limit( "volume": str(size), } return await self.endpoint('AddOrder', data) + else: # Edit order data for kraken api data["txid"] = reqid diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 588a092482..1b5eb73647 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -20,7 +20,7 @@ ''' from contextlib import asynccontextmanager as acm from functools import partial -from itertools import chain +from itertools import chain, count from pprint import pformat import time from typing import ( @@ -31,6 +31,7 @@ # Union, ) +from bidict import bidict import pendulum import trio import tractor @@ -46,7 +47,6 @@ BrokerdPosition, BrokerdStatus, ) -from piker.data.types import Struct from . import log from .api import ( Client, @@ -62,27 +62,25 @@ ) -class Trade(Struct): - ''' - Trade class that helps parse and validate ownTrades stream - - ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: float # price of asset - size: float # vol of asset - broker_time: str # e.g GTC, GTD - - async def handle_order_requests( + ws: NoBsWs, client: Client, ems_order_stream: tractor.MsgStream, + token: str, + requests: dict[str, BrokerdOrder], + ids: bidict[str, int], ) -> None: + ''' + Process new order submission requests from the EMS + and deliver acks or errors. + ''' + # XXX: UGH, let's unify this.. with ``msgspec``. request_msg: dict order: BrokerdOrder + counter = count() async for request_msg in ems_order_stream: log.info( @@ -90,150 +88,105 @@ async def handle_order_requests( f'{pformat(request_msg)}' ) - action = request_msg['action'] + account = request_msg['account'] - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerdError( + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send( + BrokerdError( oid=request_msg['oid'], symbol=request_msg['symbol'], - - # reason=f'Kraken only, No account found: `{account}` ?', reason=( 'Kraken only, order mode disabled due to ' 'https://github.com/pikers/piker/issues/299' ), + ) + ) + continue - )) - continue + action = request_msg['action'] + if action in {'buy', 'sell'}: # validate - order = BrokerdOrder(**request_msg) - # call our client api to submit the order - resp = await client.submit_limit( - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - reqid=order.reqid, - ) + msg = BrokerdOrder(**request_msg) + + # logic from old `Client.submit_limit()` + if msg.oid in ids: + ep = 'editOrder' + reqid = ids[msg.oid] # integer not txid + order = requests[msg.oid] + assert order.oid == msg.oid + extra = { + 'orderid': msg.reqid, # txid + } + + # XXX: TODO: get this working, but currently the EMS + # doesn't support changing order `.reqid` (in this case + # kraken changes them via a cancel and a new + # submission). So for now cancel and report the error. + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [msg.reqid], # should be txid from submission + }) + continue - err = resp['error'] - if err: - oid = order.oid - log.error(f'Failed to submit order: {oid}') - - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=order.reqid, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ) - ) else: - # TODO: handle multiple orders (cancels?) - # txid is an array of strings - if order.reqid is None: - reqid = resp['result']['txid'][0] - else: - # update the internal pairing of oid to krakens - # txid with the new txid that is returned on edit - reqid = resp['result']['txid'] - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - # account the made the order - account=order.account - - ) + ep = 'addOrder' + reqid = next(counter) + ids[msg.oid] = reqid + log.debug( + f"GENERATED ORDER {reqid}\n" + f'{ids}' ) + extra = { + 'ordertype': 'limit', + 'type': msg.action, + } - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) + psym = msg.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) + # call ws api to submit the order: + # https://docs.kraken.com/websockets/#message-addOrder + await ws.send_msg({ + 'event': ep, + 'token': token, - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled. - try: - result = resp['result'] - count = result['count'] - - # check for 'error' key if we received no 'result' - except KeyError: - error = resp.get('error') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason="Failed order cancel", - broker_details=resp - ) - ) + 'reqid': reqid, # remapped-to-int uid from ems + 'pair': pair, + 'price': str(msg.price), + 'volume': str(msg.size), - if not error: - raise BrokerError(f'Unknown order cancel response: {resp}') + # only ensures request is valid, nothing more + # validate: 'true', - else: - if not count: # no orders were cancelled? + } | extra) - # XXX: what exactly is this from and why would we care? - # there doesn't seem to be any docs here? - # https://docs.kraken.com/rest/#operation/cancelOrder + elif action == 'cancel': - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - pending = result.get('pending') - if pending: - log.error(f'Order {oid} cancel was not yet successful') + msg = BrokerdCancel(**request_msg) + assert msg.oid in requests + reqid = ids[msg.oid] - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - # TODO: maybe figure out if pending - # cancels will eventually get cancelled - reason="Order cancel is still pending?", - broker_details=resp - ) - ) + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [msg.reqid], # should be txid from submission + }) - else: # order cancel success case. + else: + log.error(f'Unknown order command: {request_msg}') - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ) - ) - else: - log.error(f'Unknown order command: {request_msg}') + # placehold for sanity checking in relay loop + requests[msg.oid] = msg @acm @@ -358,8 +311,21 @@ async def trades_dialogue( ) as ws, trio.open_nursery() as n, ): + reqmsgs: dict[str, BrokerdOrder] = {} + + # 2way map for ems ids to kraken int reqids.. + ids: bidict[str, int] = bidict() + # task for processing inbound requests from ems - n.start_soon(handle_order_requests, client, ems_stream) + n.start_soon( + handle_order_requests, + ws, + client, + ems_stream, + token, + reqmsgs, + ids, + ) count: int = 0 @@ -472,7 +438,7 @@ async def trades_dialogue( await ems_stream.send(pp_msg) case [ - trades_msgs, + order_msgs, 'openOrders', {'sequence': seq}, ]: @@ -481,11 +447,224 @@ async def trades_dialogue( # above: # https://github.com/pikers/piker/issues/293 # https://github.com/pikers/piker/issues/310 - log.info(f'Order update {seq}:{trades_msgs}') + log.info(f'Orders update {seq}:{order_msgs}') + + for order_msg in order_msgs: + log.info( + 'Order msg update:\n' + f'{pformat(order_msg)}' + ) + txid, update_msg = list(order_msg.items())[0] + match update_msg: + case { + 'status': status, + 'userref': reqid, + **rest, + + # 'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm, # 0.0000 + }: + ems_status = { + 'open': 'submitted', + 'closed': 'cancelled', + 'canceled': 'cancelled', + 'pending': 'pending', + }[status] + + submit_vlm = rest.get('vol', 0) + exec_vlm = rest.get('vol_exec', 0) + + # send BrokerdStatus messages for all + # order state updates + msg = BrokerdStatus( + + reqid=txid, + time_ns=time.time_ns(), # cuz why not + account=f'kraken.{acctid}', + + # everyone doin camel case.. + status=ems_status, # force lower case + + filled=exec_vlm, + reason='', # why held? + remaining=( + float(submit_vlm) + - + float(exec_vlm) + ), + + broker_details=dict( + {'name': 'kraken'}, **update_msg + ), + ) + await ems_stream.send(msg.dict()) + + case _: + log.warning( + 'Unknown orders msg:\n' + f'{txid}:{order_msg}' + ) + + case { + 'event': etype, + 'status': status, + 'errorMessage': errmsg, + 'reqid': reqid, + } if ( + etype in {'addOrderStatus', 'editOrderStatus'} + and status == 'error' + ): + log.error( + f'Failed to submit order {reqid}:\n' + f'{errmsg}' + ) + oid = ids.inverse[reqid] + order = reqmsgs[oid] + await ems_stream.send( + BrokerdError( + oid=oid, + # use old reqid in case it changed? + reqid=order.reqid, + symbol=order.symbol, + reason=f'Failed submit:\n{errmsg}', + broker_details=resp + ).dict() + ) + + # if we rx any error cancel the order again + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [order.reqid], # txid from submission + }) + + case { + 'event': 'addOrderStatus', + 'status': status, + 'reqid': reqid, # oid from ems side + + # NOTE: in the case of an edit request this is + # a new value! + 'txid': txid, + + 'descr': descr, # only on success? + # 'originaltxid': txid, # only on edits + # **rest, + }: + oid = ids.inverse[reqid] + order = reqmsgs[oid] + log.info( + f'Submitting order {oid}[{reqid}]:\n' + f'txid: {txid}\n' + f'{descr}' + ) + + # deliver ack immediately + await ems_stream.send( + BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=order.account, # piker account + ).dict() + ) + + case { + 'event': 'editOrderStatus', + 'status': status, + 'errorMessage': errmsg, + 'reqid': reqid, # oid from ems side + 'descr': descr, + + # NOTE: for edit request this is a new value + 'txid': txid, + 'originaltxid': origtxid, + # **rest, + }: + log.info( + f'Editting order {oid}[{reqid}]:\n' + f'txid: {origtxid} -> {txid}\n' + f'{descr}' + ) + # deliver another ack to update the ems-side + # `.reqid`. + oid = ids.inverse[reqid] + await ems_stream.send( + BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=order.account, # piker account + ).dict() + ) + + # successful cancellation + case { + "event": "cancelOrderStatus", + "status": "ok", + 'txid': txids, + 'reqid': reqid, + }: + # TODO: should we support "batch" acking of + # multiple cancels thus avoiding the below loop? + oid = ids.inverse[reqid] + msg = reqmsgs[oid] + + for txid in txids: + await ems_stream.send( + BrokerdStatus( + reqid=txid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=resp, + ).dict() + ) + + # failed cancel + case { + "event": "cancelOrderStatus", + "status": "error", + "errorMessage": errmsg, + 'reqid': reqid, + }: + oid = ids.inverse[reqid] + msg = reqmsgs[oid] + + await ems_stream.send( + BrokerdError( + oid=oid, + reqid=msg.reqid, + symbol=msg.symbol, + reason=f'Failed order cancel {errmsg}', + broker_details=resp + ).dict() + ) case _: log.warning(f'Unhandled trades msg: {msg}') - await tractor.breakpoint() def norm_trade_records(