diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 3abf533e0b..17b79027c7 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -282,6 +282,7 @@ async def submit_limit( "volume": str(size), } return await self.endpoint('AddOrder', data) + else: # Edit order data for kraken api data["txid"] = reqid diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 588a092482..1b5eb73647 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -20,7 +20,7 @@ ''' from contextlib import asynccontextmanager as acm from functools import partial -from itertools import chain +from itertools import chain, count from pprint import pformat import time from typing import ( @@ -31,6 +31,7 @@ # Union, ) +from bidict import bidict import pendulum import trio import tractor @@ -46,7 +47,6 @@ BrokerdPosition, BrokerdStatus, ) -from piker.data.types import Struct from . import log from .api import ( Client, @@ -62,27 +62,25 @@ ) -class Trade(Struct): - ''' - Trade class that helps parse and validate ownTrades stream - - ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: float # price of asset - size: float # vol of asset - broker_time: str # e.g GTC, GTD - - async def handle_order_requests( + ws: NoBsWs, client: Client, ems_order_stream: tractor.MsgStream, + token: str, + requests: dict[str, BrokerdOrder], + ids: bidict[str, int], ) -> None: + ''' + Process new order submission requests from the EMS + and deliver acks or errors. + ''' + # XXX: UGH, let's unify this.. with ``msgspec``. request_msg: dict order: BrokerdOrder + counter = count() async for request_msg in ems_order_stream: log.info( @@ -90,150 +88,105 @@ async def handle_order_requests( f'{pformat(request_msg)}' ) - action = request_msg['action'] + account = request_msg['account'] - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerdError( + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send( + BrokerdError( oid=request_msg['oid'], symbol=request_msg['symbol'], - - # reason=f'Kraken only, No account found: `{account}` ?', reason=( 'Kraken only, order mode disabled due to ' 'https://github.com/pikers/piker/issues/299' ), + ) + ) + continue - )) - continue + action = request_msg['action'] + if action in {'buy', 'sell'}: # validate - order = BrokerdOrder(**request_msg) - # call our client api to submit the order - resp = await client.submit_limit( - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - reqid=order.reqid, - ) + msg = BrokerdOrder(**request_msg) + + # logic from old `Client.submit_limit()` + if msg.oid in ids: + ep = 'editOrder' + reqid = ids[msg.oid] # integer not txid + order = requests[msg.oid] + assert order.oid == msg.oid + extra = { + 'orderid': msg.reqid, # txid + } + + # XXX: TODO: get this working, but currently the EMS + # doesn't support changing order `.reqid` (in this case + # kraken changes them via a cancel and a new + # submission). So for now cancel and report the error. + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [msg.reqid], # should be txid from submission + }) + continue - err = resp['error'] - if err: - oid = order.oid - log.error(f'Failed to submit order: {oid}') - - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=order.reqid, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ) - ) else: - # TODO: handle multiple orders (cancels?) - # txid is an array of strings - if order.reqid is None: - reqid = resp['result']['txid'][0] - else: - # update the internal pairing of oid to krakens - # txid with the new txid that is returned on edit - reqid = resp['result']['txid'] - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - # account the made the order - account=order.account - - ) + ep = 'addOrder' + reqid = next(counter) + ids[msg.oid] = reqid + log.debug( + f"GENERATED ORDER {reqid}\n" + f'{ids}' ) + extra = { + 'ordertype': 'limit', + 'type': msg.action, + } - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) + psym = msg.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) + # call ws api to submit the order: + # https://docs.kraken.com/websockets/#message-addOrder + await ws.send_msg({ + 'event': ep, + 'token': token, - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled. - try: - result = resp['result'] - count = result['count'] - - # check for 'error' key if we received no 'result' - except KeyError: - error = resp.get('error') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason="Failed order cancel", - broker_details=resp - ) - ) + 'reqid': reqid, # remapped-to-int uid from ems + 'pair': pair, + 'price': str(msg.price), + 'volume': str(msg.size), - if not error: - raise BrokerError(f'Unknown order cancel response: {resp}') + # only ensures request is valid, nothing more + # validate: 'true', - else: - if not count: # no orders were cancelled? + } | extra) - # XXX: what exactly is this from and why would we care? - # there doesn't seem to be any docs here? - # https://docs.kraken.com/rest/#operation/cancelOrder + elif action == 'cancel': - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - pending = result.get('pending') - if pending: - log.error(f'Order {oid} cancel was not yet successful') + msg = BrokerdCancel(**request_msg) + assert msg.oid in requests + reqid = ids[msg.oid] - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - # TODO: maybe figure out if pending - # cancels will eventually get cancelled - reason="Order cancel is still pending?", - broker_details=resp - ) - ) + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [msg.reqid], # should be txid from submission + }) - else: # order cancel success case. + else: + log.error(f'Unknown order command: {request_msg}') - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ) - ) - else: - log.error(f'Unknown order command: {request_msg}') + # placehold for sanity checking in relay loop + requests[msg.oid] = msg @acm @@ -358,8 +311,21 @@ async def trades_dialogue( ) as ws, trio.open_nursery() as n, ): + reqmsgs: dict[str, BrokerdOrder] = {} + + # 2way map for ems ids to kraken int reqids.. + ids: bidict[str, int] = bidict() + # task for processing inbound requests from ems - n.start_soon(handle_order_requests, client, ems_stream) + n.start_soon( + handle_order_requests, + ws, + client, + ems_stream, + token, + reqmsgs, + ids, + ) count: int = 0 @@ -472,7 +438,7 @@ async def trades_dialogue( await ems_stream.send(pp_msg) case [ - trades_msgs, + order_msgs, 'openOrders', {'sequence': seq}, ]: @@ -481,11 +447,224 @@ async def trades_dialogue( # above: # https://github.com/pikers/piker/issues/293 # https://github.com/pikers/piker/issues/310 - log.info(f'Order update {seq}:{trades_msgs}') + log.info(f'Orders update {seq}:{order_msgs}') + + for order_msg in order_msgs: + log.info( + 'Order msg update:\n' + f'{pformat(order_msg)}' + ) + txid, update_msg = list(order_msg.items())[0] + match update_msg: + case { + 'status': status, + 'userref': reqid, + **rest, + + # 'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm, # 0.0000 + }: + ems_status = { + 'open': 'submitted', + 'closed': 'cancelled', + 'canceled': 'cancelled', + 'pending': 'pending', + }[status] + + submit_vlm = rest.get('vol', 0) + exec_vlm = rest.get('vol_exec', 0) + + # send BrokerdStatus messages for all + # order state updates + msg = BrokerdStatus( + + reqid=txid, + time_ns=time.time_ns(), # cuz why not + account=f'kraken.{acctid}', + + # everyone doin camel case.. + status=ems_status, # force lower case + + filled=exec_vlm, + reason='', # why held? + remaining=( + float(submit_vlm) + - + float(exec_vlm) + ), + + broker_details=dict( + {'name': 'kraken'}, **update_msg + ), + ) + await ems_stream.send(msg.dict()) + + case _: + log.warning( + 'Unknown orders msg:\n' + f'{txid}:{order_msg}' + ) + + case { + 'event': etype, + 'status': status, + 'errorMessage': errmsg, + 'reqid': reqid, + } if ( + etype in {'addOrderStatus', 'editOrderStatus'} + and status == 'error' + ): + log.error( + f'Failed to submit order {reqid}:\n' + f'{errmsg}' + ) + oid = ids.inverse[reqid] + order = reqmsgs[oid] + await ems_stream.send( + BrokerdError( + oid=oid, + # use old reqid in case it changed? + reqid=order.reqid, + symbol=order.symbol, + reason=f'Failed submit:\n{errmsg}', + broker_details=resp + ).dict() + ) + + # if we rx any error cancel the order again + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [order.reqid], # txid from submission + }) + + case { + 'event': 'addOrderStatus', + 'status': status, + 'reqid': reqid, # oid from ems side + + # NOTE: in the case of an edit request this is + # a new value! + 'txid': txid, + + 'descr': descr, # only on success? + # 'originaltxid': txid, # only on edits + # **rest, + }: + oid = ids.inverse[reqid] + order = reqmsgs[oid] + log.info( + f'Submitting order {oid}[{reqid}]:\n' + f'txid: {txid}\n' + f'{descr}' + ) + + # deliver ack immediately + await ems_stream.send( + BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=order.account, # piker account + ).dict() + ) + + case { + 'event': 'editOrderStatus', + 'status': status, + 'errorMessage': errmsg, + 'reqid': reqid, # oid from ems side + 'descr': descr, + + # NOTE: for edit request this is a new value + 'txid': txid, + 'originaltxid': origtxid, + # **rest, + }: + log.info( + f'Editting order {oid}[{reqid}]:\n' + f'txid: {origtxid} -> {txid}\n' + f'{descr}' + ) + # deliver another ack to update the ems-side + # `.reqid`. + oid = ids.inverse[reqid] + await ems_stream.send( + BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=order.account, # piker account + ).dict() + ) + + # successful cancellation + case { + "event": "cancelOrderStatus", + "status": "ok", + 'txid': txids, + 'reqid': reqid, + }: + # TODO: should we support "batch" acking of + # multiple cancels thus avoiding the below loop? + oid = ids.inverse[reqid] + msg = reqmsgs[oid] + + for txid in txids: + await ems_stream.send( + BrokerdStatus( + reqid=txid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=resp, + ).dict() + ) + + # failed cancel + case { + "event": "cancelOrderStatus", + "status": "error", + "errorMessage": errmsg, + 'reqid': reqid, + }: + oid = ids.inverse[reqid] + msg = reqmsgs[oid] + + await ems_stream.send( + BrokerdError( + oid=oid, + reqid=msg.reqid, + symbol=msg.symbol, + reason=f'Failed order cancel {errmsg}', + broker_details=resp + ).dict() + ) case _: log.warning(f'Unhandled trades msg: {msg}') - await tractor.breakpoint() def norm_trade_records(