diff --git a/piker/_daemon.py b/piker/_daemon.py index 82dc848a0..31061b37e 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -22,13 +22,13 @@ from contextlib import asynccontextmanager as acm from collections import defaultdict -from pydantic import BaseModel +import tractor import trio from trio_typing import TaskStatus -import tractor from .log import get_logger, get_console_log from .brokers import get_brokermod +from .data.types import Struct log = get_logger(__name__) @@ -47,16 +47,13 @@ ] -class Services(BaseModel): +class Services(Struct): actor_n: tractor._supervise.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} - class Config: - arbitrary_types_allowed = True - async def start_service_task( self, name: str, @@ -207,23 +204,26 @@ async def open_piker_runtime( assert _services is None # XXX: this may open a root actor as well - async with ( - tractor.open_root_actor( + with tractor.msg.configure_native_msgs( + [Struct], + ): + async with ( + tractor.open_root_actor( - # passed through to ``open_root_actor`` - arbiter_addr=_registry_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, + # passed through to ``open_root_actor`` + arbiter_addr=_registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - ) as _, - ): - yield tractor.current_actor() + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + ) as _, + ): + yield tractor.current_actor() @acm @@ -263,27 +263,31 @@ async def maybe_open_pikerd( if loglevel: get_console_log(loglevel) - # subtle, we must have the runtime up here or portal lookup will fail - async with maybe_open_runtime(loglevel, **kwargs): + # XXX: this may open a root actor as well + with tractor.msg.configure_native_msgs( + [Struct], + ): + # subtle, we must have the runtime up here or portal lookup will fail + async with maybe_open_runtime(loglevel, **kwargs): - async with tractor.find_actor(_root_dname) as portal: - # assert portal is not None - if portal is not None: - yield portal - return + async with tractor.find_actor(_root_dname) as portal: + # assert portal is not None + if portal is not None: + yield portal + return - # presume pikerd role since no daemon could be found at - # configured address - async with open_pikerd( + # presume pikerd role since no daemon could be found at + # configured address + async with open_pikerd( - loglevel=loglevel, - debug_mode=kwargs.get('debug_mode', False), + loglevel=loglevel, + debug_mode=kwargs.get('debug_mode', False), - ) as _: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - yield None + ) as _: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + yield None # brokerd enabled modules @@ -445,7 +449,7 @@ async def spawn_brokerd( ) # non-blocking setup of brokerd service nursery - from .data import _setup_persistent_brokerd + from .data.feed import _setup_persistent_brokerd await _services.start_service_task( dname, diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index e528172ec..add23b185 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -34,13 +34,13 @@ import numpy as np import tractor from pydantic.dataclasses import dataclass -from pydantic import BaseModel import wsproto from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data.types import Struct from ..data._web_bs import open_autorecon_ws, NoBsWs log = get_logger(__name__) @@ -79,12 +79,14 @@ # https://binance-docs.github.io/apidocs/spot/en/#exchange-information -class Pair(BaseModel): +class Pair(Struct, frozen=True): symbol: str status: str baseAsset: str baseAssetPrecision: int + cancelReplaceAllowed: bool + allowTrailingStop: bool quoteAsset: str quotePrecision: int quoteAssetPrecision: int @@ -287,7 +289,7 @@ async def get_client() -> Client: # validation type -class AggTrade(BaseModel): +class AggTrade(Struct): e: str # Event type E: int # Event time s: str # Symbol @@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: elif msg.get('e') == 'aggTrade': - # validate + # NOTE: this is purely for a definition, ``msgspec.Struct`` + # does not runtime-validate until you decode/encode. + # see: https://jcristharif.com/msgspec/structs.html#type-validation msg = AggTrade(**msg) # TODO: type out and require this quote format @@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: 'brokerd_ts': time.time(), 'ticks': [{ 'type': 'trade', - 'price': msg.p, - 'size': msg.q, + 'price': float(msg.p), + 'size': float(msg.q), 'broker_ts': msg.T, }], } @@ -448,7 +452,7 @@ async def stream_quotes( d = cache[sym.upper()] syminfo = Pair(**d) # validation - si = sym_infos[sym] = syminfo.dict() + si = sym_infos[sym] = syminfo.to_dict() # XXX: after manually inspecting the response format we # just directly pick out the info we need diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b768c9a1b..4737d3768 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -148,7 +148,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No account found: `{account}` ?', - ).dict()) + )) continue client = _accounts2clients.get(account) @@ -161,7 +161,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No api client loaded for account: `{account}` ?', - ).dict()) + )) continue if action in {'buy', 'sell'}: @@ -188,7 +188,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason='Order already active?', - ).dict()) + )) # deliver ack that order has been submitted to broker routing await ems_order_stream.send( @@ -197,9 +197,8 @@ async def handle_order_requests( oid=order.oid, # broker specific request id reqid=reqid, - time_ns=time.time_ns(), account=account, - ).dict() + ) ) elif action == 'cancel': @@ -559,7 +558,7 @@ async def open_stream( cids2pps, validate=True, ) - all_positions.extend(msg.dict() for msg in msgs) + all_positions.extend(msg for msg in msgs) if not all_positions and cids2pps: raise RuntimeError( @@ -665,7 +664,7 @@ async def emit_pp_update( msg = msgs[0] break - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) async def deliver_trade_events( @@ -743,7 +742,7 @@ async def deliver_trade_events( broker_details={'name': 'ib'}, ) - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) case 'fill': @@ -803,7 +802,7 @@ async def deliver_trade_events( broker_time=trade_entry['broker_time'], ) - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) # 2 cases: # - fill comes first or @@ -879,7 +878,7 @@ async def deliver_trade_events( cid, msg = pack_position(item) # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! - # await ems_stream.send(msg.dict()) + # await ems_stream.send(msg) case 'event': @@ -891,7 +890,7 @@ async def deliver_trade_events( # level... # reqid = item.get('reqid', 0) # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # log.info(f"TWS triggered trade\n{pformat(msg)}") # msg.reqid = 'tws-' + str(-1 * reqid) diff --git a/piker/brokers/kraken/README.rst b/piker/brokers/kraken/README.rst new file mode 100644 index 000000000..afeaeb2b6 --- /dev/null +++ b/piker/brokers/kraken/README.rst @@ -0,0 +1,64 @@ +``kraken`` backend +------------------ +though they don't have the most liquidity of all the cexes they sure are +accommodating to those of us who appreciate a little ``xmr``. + +status +****** +current support is *production grade* and both real-time data and order +management should be correct and fast. this backend is used by core devs +for live trading. + + +config +****** +In order to get order mode support your ``brokers.toml`` +needs to have something like the following: + +.. code:: toml + + [kraken] + accounts.spot = 'spot' + key_descr = "spot" + api_key = "69696969696969696696969696969696969696969696969696969696" + secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696" + + +If everything works correctly you should see any current positions +loaded in the pps pane on chart load and you should also be able to +check your trade records in the file:: + + /ledgers/trades_kraken_spot.toml + + +An example ledger file will have entries written verbatim from the +trade events schema: + +.. code:: toml + + [TFJBKK-SMBZS-VJ4UWS] + ordertxid = "SMBZSA-7CNQU-3HWLNJ" + postxid = "SMBZSE-M7IF5-CFI7LT" + pair = "XXMRZEUR" + time = 1655691993.4133966 + type = "buy" + ordertype = "limit" + price = "103.97000000" + cost = "499.99999977" + fee = "0.80000000" + vol = "4.80907954" + margin = "0.00000000" + misc = "" + + +your ``pps.toml`` file will have position entries like, + +.. code:: toml + + [kraken.spot."xmreur.kraken"] + size = 4.80907954 + be_price = 103.97000000 + bsuid = "XXMRZEUR" + clears = [ + { tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" }, + ] diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 3abf533e0..17b79027c 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -282,6 +282,7 @@ async def submit_limit( "volume": str(size), } return await self.endpoint('AddOrder', data) + else: # Edit order data for kraken api data["txid"] = reqid diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 4e2e02f64..c00277923 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,21 +18,22 @@ Order api and machinery ''' -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from functools import partial -from itertools import chain +from itertools import chain, count from pprint import pformat import time from typing import ( Any, AsyncIterator, - # Callable, - # Optional, - # Union, + Union, ) +from bidict import bidict import pendulum -from pydantic import BaseModel import trio import tractor import wsproto @@ -61,179 +62,130 @@ stream_messages, ) - -class Trade(BaseModel): - ''' - Trade class that helps parse and validate ownTrades stream - - ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: float # price of asset - size: float # vol of asset - broker_time: str # e.g GTC, GTD +MsgUnion = Union[ + BrokerdCancel, + BrokerdError, + BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, +] async def handle_order_requests( + ws: NoBsWs, client: Client, ems_order_stream: tractor.MsgStream, + token: str, + emsflow: dict[str, list[MsgUnion]], + ids: bidict[str, int], ) -> None: + ''' + Process new order submission requests from the EMS + and deliver acks or errors. - request_msg: dict + ''' + # XXX: UGH, let's unify this.. with ``msgspec``. + msg: dict[str, Any] order: BrokerdOrder + counter = count() + + async for msg in ems_order_stream: + log.info(f'Rx order msg:\n{pformat(msg)}') + match msg: + case { + 'account': 'kraken.spot', + 'action': action, + } if action in {'buy', 'sell'}: + + # validate + order = BrokerdOrder(**msg) + + # logic from old `Client.submit_limit()` + if order.oid in ids: + ep = 'editOrder' + reqid = ids[order.oid] # integer not txid + last = emsflow[order.oid][-1] + assert last.reqid == order.reqid + extra = { + 'orderid': last.reqid, # txid + } - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) - - action = request_msg['action'] - - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - - # reason=f'Kraken only, No account found: `{account}` ?', - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - - ).dict()) - continue - - # validate - order = BrokerdOrder(**request_msg) - # call our client api to submit the order - resp = await client.submit_limit( - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - reqid=order.reqid, - ) - - err = resp['error'] - if err: - oid = order.oid - log.error(f'Failed to submit order: {oid}') - - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=order.reqid, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ).dict() - ) - else: - # TODO: handle multiple orders (cancels?) - # txid is an array of strings - if order.reqid is None: - reqid = resp['result']['txid'][0] else: - # update the internal pairing of oid to krakens - # txid with the new txid that is returned on edit - reqid = resp['result']['txid'] - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - # account the made the order - account=order.account - - ).dict() - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) - - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled. - try: - result = resp['result'] - count = result['count'] - - # check for 'error' key if we received no 'result' - except KeyError: - error = resp.get('error') + ep = 'addOrder' + reqid = next(counter) + ids[order.oid] = reqid + log.debug( + f"GENERATED ORDER {reqid}\n" + f'{ids}' + ) + extra = { + 'ordertype': 'limit', + 'type': order.action, + } + + psym = order.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' + + # call ws api to submit the order: + # https://docs.kraken.com/websockets/#message-addOrder + req = { + 'event': ep, + 'token': token, + + 'reqid': reqid, # remapped-to-int uid from ems + 'pair': pair, + 'price': str(order.price), + 'volume': str(order.size), + + # only ensures request is valid, nothing more + # validate: 'true', + + } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') + await ws.send_msg(req) + + # placehold for sanity checking in relay loop + emsflow.setdefault(order.oid, []).append(order) + + case { + 'account': 'kraken.spot', + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + assert cancel.oid in emsflow + reqid = ids[cancel.oid] + + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [cancel.reqid], # should be txid from submission + }) + + case _: + account = msg.get('account') + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) await ems_order_stream.send( BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason="Failed order cancel", - broker_details=resp - ).dict() - ) + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + 'Invalid request msg:\n{msg}' + ), - if not error: - raise BrokerError(f'Unknown order cancel response: {resp}') - - else: - if not count: # no orders were cancelled? - - # XXX: what exactly is this from and why would we care? - # there doesn't seem to be any docs here? - # https://docs.kraken.com/rest/#operation/cancelOrder - - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - pending = result.get('pending') - if pending: - log.error(f'Order {oid} cancel was not yet successful') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - # TODO: maybe figure out if pending - # cancels will eventually get cancelled - reason="Order cancel is still pending?", - broker_details=resp - ).dict() - ) - - else: # order cancel success case. - - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() ) - else: - log.error(f'Unknown order command: {request_msg}') + ) @acm @@ -310,13 +262,13 @@ async def trades_dialogue( log.info( f'Loaded {len(trades)} trades from account `{acc_name}`' ) - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys(t.bsuid for t in trans), - ) + with open_ledger(acctid, trades) as trans: + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys(t.bsuid for t in trans), + ) position_msgs: list[dict] = [] pps: dict[int, pp.Position] @@ -330,7 +282,7 @@ async def trades_dialogue( avg_price=p.be_price, currency='', ) - position_msgs.append(msg.dict()) + position_msgs.append(msg) await ctx.started( (position_msgs, [acc_name]) @@ -358,134 +310,410 @@ async def trades_dialogue( ) as ws, trio.open_nursery() as n, ): + # task local msg dialog tracking + emsflow: dict[ + str, + list[MsgUnion], + ] = {} + + # 2way map for ems ids to kraken int reqids.. + ids: bidict[str, int] = bidict() + # task for processing inbound requests from ems - n.start_soon(handle_order_requests, client, ems_stream) + n.start_soon( + handle_order_requests, + ws, + client, + ems_stream, + token, + emsflow, + ids, + ) + + # enter relay loop + await handle_order_updates( + ws, + ems_stream, + emsflow, + ids, + trans, + acctid, + acc_name, + token, + ) + + +async def handle_order_updates( + ws: NoBsWs, + ems_stream: tractor.MsgStream, + emsflow: dict[str, list[MsgUnion]], + ids: bidict[str, int], + trans: list[pp.Transaction], + acctid: str, + acc_name: str, + token: str, + +) -> None: + ''' + Main msg handling loop for all things order management. + + This code is broken out to make the context explicit and state variables + defined in the signature clear to the reader. - count: int = 0 + ''' + # transaction records which will be updated + # on new trade clearing events (aka order "fills") + trans: list[pp.Transaction] - # process and relay trades events to ems + async for msg in stream_messages(ws): + match msg: + # process and relay clearing trade events to ems # https://docs.kraken.com/websockets/#message-ownTrades - async for msg in stream_messages(ws): - match msg: - case [ - trades_msgs, - 'ownTrades', - {'sequence': seq}, - ]: - # XXX: do we actually need this orrr? - # ensure that we are only processing new trades? - assert seq > count - count += 1 - - # flatten msgs for processing - trades = { - tid: trade - for entry in trades_msgs - for (tid, trade) in entry.items() - - # only emit entries which are already not-in-ledger - if tid not in {r.tid for r in trans} - } - for tid, trade in trades.items(): - - # parse-cast - reqid = trade['ordertxid'] - action = trade['type'] - price = float(trade['price']) - size = float(trade['vol']) - broker_time = float(trade['time']) - - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=reqid, - time_ns=time.time_ns(), - - action=action, - size=size, - price=price, - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'}, - broker_time=broker_time - ) - await ems_stream.send(fill_msg.dict()) - - filled_msg = BrokerdStatus( - reqid=reqid, - time_ns=time.time_ns(), - - account=acc_name, - status='filled', - filled=size, - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': broker_time - }, - - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) - - # update ledger and position tracking - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys( - t.bsuid for t in trans), - ) - - # emit pp msgs - for pos in filter( - bool, - chain(active.values(), closed.values()), - ): - pp_msg = BrokerdPosition( - broker='kraken', - - # XXX: ok so this is annoying, we're - # relaying an account name with the - # backend suffix prefixed but when - # reading accounts from ledgers we - # don't need it and/or it's prefixed - # in the section table.. we should - # just strip this from the message - # right since `.broker` is already - # included? + case [ + trades_msgs, + 'ownTrades', + {'sequence': seq}, + ]: + # flatten msgs to an {id -> data} table for processing + trades = { + tid: trade + for entry in trades_msgs + for (tid, trade) in entry.items() + + # only emit entries which are already not-in-ledger + if tid not in {r.tid for r in trans} + } + for tid, trade in trades.items(): + + # parse-cast + reqid = trade['ordertxid'] + action = trade['type'] + price = float(trade['price']) + size = float(trade['vol']) + broker_time = float(trade['time']) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), + + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + await ems_stream.send(fill_msg) + + filled_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), + + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg) + + # update ledger and position tracking + with open_ledger(acctid, trades) as trans: + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys( + t.bsuid for t in trans), + ) + + # emit any new pp msgs to ems + for pos in filter( + bool, + chain(active.values(), closed.values()), + ): + pp_msg = BrokerdPosition( + broker='kraken', + + # XXX: ok so this is annoying, we're + # relaying an account name with the + # backend suffix prefixed but when + # reading accounts from ledgers we + # don't need it and/or it's prefixed + # in the section table.. we should + # just strip this from the message + # right since `.broker` is already + # included? + account=f'kraken.{acctid}', + symbol=pos.symbol.front_fqsn(), + size=pos.size, + avg_price=pos.be_price, + + # TODO + # currency='' + ) + await ems_stream.send(pp_msg) + + # process and relay order state change events + # https://docs.kraken.com/websockets/#message-openOrders + case [ + order_msgs, + 'openOrders', + {'sequence': seq}, + ]: + for order_msg in order_msgs: + log.info( + f'Order msg update_{seq}:\n' + f'{pformat(order_msg)}' + ) + txid, update_msg = list(order_msg.items())[0] + match update_msg: + + # we ignore internal order updates triggered by + # kraken's "edit" endpoint. + case { + 'cancel_reason': 'Order replaced', + 'status': status, + 'userref': reqid, + **rest, + }: + continue + + case { + 'status': status, + 'userref': reqid, + **rest, + + # XXX: eg. of remaining msg schema: + # 'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm, # 0.0000 + }: + ems_status = { + 'open': 'submitted', + 'closed': 'cancelled', + 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? + 'pending': 'pending', + }[status] + + submit_vlm = rest.get('vol', 0) + exec_vlm = rest.get('vol_exec', 0) + + oid = ids.inverse[reqid] + msgs = emsflow[oid] + + # send BrokerdStatus messages for all + # order state updates + resp = BrokerdStatus( + + reqid=txid, + time_ns=time.time_ns(), # cuz why not account=f'kraken.{acctid}', - symbol=pos.symbol.front_fqsn(), - size=pos.size, - avg_price=pos.be_price, - # TODO - # currency='' + # everyone doin camel case.. + status=ems_status, # force lower case + + filled=exec_vlm, + reason='', # why held? + remaining=( + float(submit_vlm) + - + float(exec_vlm) + ), + + broker_details=dict( + {'name': 'kraken'}, **update_msg + ), ) - await ems_stream.send(pp_msg.dict()) + msgs.append(resp) + await ems_stream.send(resp) - case [ - trades_msgs, - 'openOrders', - {'sequence': seq}, - ]: - # TODO: async order update handling which we - # should remove from `handle_order_requests()` - # above: - # https://github.com/pikers/piker/issues/293 - # https://github.com/pikers/piker/issues/310 - log.info(f'Order update {seq}:{trades_msgs}') + case _: + log.warning( + 'Unknown orders msg:\n' + f'{txid}:{order_msg}' + ) - case _: - log.warning(f'Unhandled trades msg: {msg}') - await tractor.breakpoint() + case { + 'event': etype, + 'status': status, + 'reqid': reqid, + } as event if ( + etype in { + 'addOrderStatus', + 'editOrderStatus', + 'cancelOrderStatus', + } + ): + oid = ids.inverse[reqid] + msgs = emsflow[oid] + last = msgs[-1] + resps, errored = process_status( + event, + oid, + token, + msgs, + last, + ) + # if errored: + # if we rx any error cancel the order again + # await ws.send_msg({ + # 'event': 'cancelOrder', + # 'token': token, + # 'reqid': reqid, + # 'txid': [last.reqid], # txid from submission + # }) + + msgs.extend(resps) + for resp in resps: + await ems_stream.send(resp) + + case _: + log.warning(f'Unhandled trades update msg: {msg}') + + +def process_status( + event: dict[str, str], + oid: str, + token: str, + msgs: list[MsgUnion], + last: MsgUnion, + +) -> tuple[list[MsgUnion], bool]: + ''' + Process `'[add/edit/cancel]OrderStatus'` events by translating to + and returning the equivalent EMS-msg responses. + + ''' + match event: + case { + 'event': etype, + 'status': 'error', + 'reqid': reqid, + 'errorMessage': errmsg, + }: + # any of ``{'add', 'edit', 'cancel'}`` + action = etype.rstrip('OrderStatus') + log.error( + f'Failed to {action} order {reqid}:\n' + f'{errmsg}' + ) + resp = BrokerdError( + oid=oid, + # XXX: use old reqid in case it changed? + reqid=last.reqid, + symbol=getattr(last, 'symbol', 'N/A'), + + reason=f'Failed {action}:\n{errmsg}', + broker_details=event + ) + return [resp], True + + # successful request cases + case { + 'event': 'addOrderStatus', + 'status': "ok", + 'reqid': reqid, # oid from ems side + 'txid': txid, + 'descr': descr, # only on success? + }: + log.info( + f'Submitting order: {descr}\n' + f'ems oid: {oid}\n' + f're-mapped reqid: {reqid}\n' + f'txid: {txid}\n' + ) + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account + ) + return [resp], False + + case { + 'event': 'editOrderStatus', + 'status': "ok", + 'reqid': reqid, # oid from ems side + 'descr': descr, + + # NOTE: for edit request this is a new value + 'txid': txid, + 'originaltxid': origtxid, + }: + log.info( + f'Editting order {oid}[requid={reqid}]:\n' + f'txid: {origtxid} -> {txid}\n' + f'{descr}' + ) + # deliver another ack to update the ems-side `.reqid`. + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account + ) + return [resp], False + + case { + "event": "cancelOrderStatus", + "status": "ok", + 'reqid': reqid, + + # XXX: sometimes this isn't provided!? + # 'txid': txids, + **rest, + }: + # TODO: should we support "batch" acking of + # multiple cancels thus avoiding the below loop? + resps: list[MsgUnion] = [] + for txid in rest.get('txid', [last.reqid]): + resp = BrokerdStatus( + reqid=txid, + account=last.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=event, + ) + resps.append(resp) + + return resps, False def norm_trade_records( @@ -494,10 +722,9 @@ def norm_trade_records( ) -> list[pp.Transaction]: records: list[pp.Transaction] = [] - for tid, record in ledger.items(): - size = record.get('vol') * { + size = float(record.get('vol')) * { 'buy': 1, 'sell': -1, }[record['type']] @@ -508,7 +735,7 @@ def norm_trade_records( pp.Transaction( fqsn=f'{norm_sym}.kraken', tid=tid, - size=float(size), + size=size, price=float(record['price']), cost=float(record['fee']), dt=pendulum.from_timestamp(float(record['time'])), @@ -522,19 +749,24 @@ def norm_trade_records( return records -async def update_ledger( +@cm +def open_ledger( acctid: str, trade_entries: list[dict[str, Any]], ) -> list[pp.Transaction]: + ''' + Write recent session's trades to the user's (local) ledger file. - # write recent session's trades to the user's (local) ledger file. + ''' with pp.open_trade_ledger( 'kraken', acctid, ) as ledger: - ledger.update(trade_entries) - # normalize to transaction form - records = norm_trade_records(trade_entries) - return records + # normalize to transaction form + records = norm_trade_records(trade_entries) + yield records + + # update on exit + ledger.update(trade_entries) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 71b750825..4966db8a9 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -31,7 +31,6 @@ from fuzzywuzzy import process as fuzzy import numpy as np import pendulum -from pydantic import BaseModel from trio_typing import TaskStatus import tractor import trio @@ -45,6 +44,7 @@ ) from piker.log import get_console_log from piker.data import ShmArray +from piker.data.types import Struct from piker.data._web_bs import open_autorecon_ws, NoBsWs from . import log from .api import ( @@ -54,7 +54,7 @@ # https://www.kraken.com/features/api#get-tradable-pairs -class Pair(BaseModel): +class Pair(Struct): altname: str # alternate pair name wsname: str # WebSocket pair name (if available) aclass_base: str # asset class of base component @@ -117,9 +117,8 @@ async def stream_messages( too_slow_count = 0 continue - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - + match msg: + case {'event': 'heartbeat'}: now = time.time() delay = now - last_hb last_hb = now @@ -130,11 +129,20 @@ async def stream_messages( continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - yield msg + case { + 'connectionID': _, + 'event': 'systemStatus', + 'status': 'online', + 'version': _, + } as msg: + log.info( + 'WS connection is up:\n' + f'{msg}' + ) + continue + + case _: + yield msg async def process_data_feed_msgs( @@ -145,37 +153,60 @@ async def process_data_feed_msgs( ''' async for msg in stream_messages(ws): + match msg: + case { + 'errorMessage': errmsg + }: + raise BrokerError(errmsg) + + case { + 'event': 'subscriptionStatus', + } as sub: + log.info( + 'WS subscription is active:\n' + f'{sub}' + ) + continue - chan_id, *payload_array, chan_name, pair = msg - - if 'ohlc' in chan_name: - - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + case [ + chan_id, + *payload_array, + chan_name, + pair + ]: + if 'ohlc' in chan_name: + yield 'ohlc', OHLC( + chan_id, + chan_name, + pair, + *payload_array[0] + ) - elif 'spread' in chan_name: + elif 'spread' in chan_name: - bid, ask, ts, bsize, asize = map(float, payload_array[0]) + bid, ask, ts, bsize, asize = map( + float, payload_array[0]) - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) - else: - print(f'UNHANDLED MSG: {msg}') - yield msg + case _: + print(f'UNHANDLED MSG: {msg}') + # yield msg def normalize( @@ -316,7 +347,7 @@ async def stream_quotes( sym = sym.upper() si = Pair(**await client.symbol_info(sym)) # validation - syminfo = si.dict() + syminfo = si.to_dict() syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals syminfo['asset_type'] = 'crypto' @@ -385,7 +416,7 @@ async def subscribe(ws: wsproto.WSConnection): msg_gen = process_data_feed_msgs(ws) # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() + typ, ohlc_last = await anext(msg_gen) topic, quote = normalize(ohlc_last) diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 336a9b25f..8e51ed16d 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -22,10 +22,9 @@ from typing import Optional from bidict import bidict -from pydantic import BaseModel, validator -# from msgspec import Struct from ..data._source import Symbol +from ..data.types import Struct from ..pp import Position @@ -41,33 +40,30 @@ ) -class Allocator(BaseModel): - - class Config: - validate_assignment = True - copy_on_model_validation = False - arbitrary_types_allowed = True - - # required to get the account validator lookup working? - extra = 'allow' - underscore_attrs_are_private = False +class Allocator(Struct): symbol: Symbol account: Optional[str] = 'paper' + + _size_units: bidict[str, Optional[str]] = _size_units + # TODO: for enums this clearly doesn't fucking work, you can't set # a default at startup by passing in a `dict` but yet you can set # that value through assignment..for wtv cucked reason.. honestly, pure # unintuitive garbage. - size_unit: str = 'currency' - _size_units: dict[str, Optional[str]] = _size_units + _size_unit: str = 'currency' - @validator('size_unit', pre=True) - def maybe_lookup_key(cls, v): - # apply the corresponding enum key for the text "description" value + @property + def size_unit(self) -> str: + return self._size_unit + + @size_unit.setter + def size_unit(self, v: str) -> Optional[str]: if v not in _size_units: - return _size_units.inverse[v] + v = _size_units.inverse[v] assert v in _size_units + self._size_unit = v return v # TODO: if we ever want ot support non-uniform entry-slot-proportion @@ -262,7 +258,7 @@ def mk_allocator( # default allocation settings defaults: dict[str, float] = { 'account': None, # select paper by default - 'size_unit': 'currency', + # 'size_unit': 'currency', 'units_limit': 400, 'currency_limit': 5e3, 'slots': 4, @@ -301,6 +297,9 @@ def mk_allocator( # entry step 1.0 alloc.units_limit = alloc.slots + else: + alloc.size_unit = 'currency' + # if the current position is already greater then the limit # settings, increase the limit to the current position if alloc.size_unit == 'currency': diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 837c28bce..91cb94fac 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -58,11 +58,11 @@ class OrderBook: def send( self, - msg: Order, + msg: Order | dict, ) -> dict: self._sent_orders[msg.oid] = msg - self._to_ems.send_nowait(msg.dict()) + self._to_ems.send_nowait(msg) return msg def update( @@ -73,9 +73,8 @@ def update( ) -> dict: cmd = self._sent_orders[uuid] - msg = cmd.dict() - msg.update(data) - self._sent_orders[uuid] = Order(**msg) + msg = cmd.copy(update=data) + self._sent_orders[uuid] = msg self._to_ems.send_nowait(msg) return cmd @@ -88,7 +87,7 @@ def cancel(self, uuid: str) -> bool: oid=uuid, symbol=cmd.symbol, ) - self._to_ems.send_nowait(msg.dict()) + self._to_ems.send_nowait(msg) _orders: OrderBook = None @@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code( book = get_orders() async with book._from_order_book.subscribe() as orders_stream: async for cmd in orders_stream: - if cmd['symbol'] == symbol_key: + if cmd.symbol == symbol_key: log.info(f'Send order cmd:\n{pformat(cmd)}') # send msg over IPC / wire await to_ems_stream.send(cmd) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index e0917c852..9b8e09341 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -20,12 +20,12 @@ """ from contextlib import asynccontextmanager from dataclasses import dataclass, field +from math import isnan from pprint import pformat import time from typing import AsyncIterator, Callable from bidict import bidict -from pydantic import BaseModel import trio from trio_typing import TaskStatus import tractor @@ -33,6 +33,7 @@ from ..log import get_logger from ..data._normalize import iterticks from ..data.feed import Feed, maybe_open_feed +from ..data.types import Struct from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( @@ -87,7 +88,8 @@ def check_lt(price: float) -> bool: @dataclass class _DarkBook: - '''EMS-trigger execution book. + ''' + EMS-trigger execution book. Contains conditions for executions (aka "orders" or "triggers") which are not exposed to brokers and thus the market; i.e. these are @@ -230,7 +232,7 @@ async def clear_dark_triggers( price=submit_price, size=cmd['size'], ) - await brokerd_orders_stream.send(msg.dict()) + await brokerd_orders_stream.send(msg) # mark this entry as having sent an order # request. the entry will be replaced once the @@ -246,14 +248,11 @@ async def clear_dark_triggers( msg = Status( oid=oid, # ems order id - resp=resp, time_ns=time.time_ns(), - symbol=fqsn, + resp=resp, trigger_price=price, - broker_details={'name': broker}, - cmd=cmd, # original request message - - ).dict() + brokerd_msg=cmd, + ) # remove exec-condition from set log.info(f'removing pred for {oid}') @@ -302,7 +301,7 @@ class TradesRelay: consumers: int = 0 -class Router(BaseModel): +class Router(Struct): ''' Order router which manages and tracks per-broker dark book, alerts, clearing and related data feed management. @@ -323,10 +322,6 @@ class Router(BaseModel): # brokername to trades-dialogues streams with ``brokerd`` actors relays: dict[str, TradesRelay] = {} - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False - def get_dark_book( self, brokername: str, @@ -580,11 +575,11 @@ async def translate_and_relay_brokerd_events( if name == 'position': - pos_msg = BrokerdPosition(**brokerd_msg).dict() + pos_msg = BrokerdPosition(**brokerd_msg) # XXX: this will be useful for automatic strats yah? # keep pps per account up to date locally in ``emsd`` mem - sym, broker = pos_msg['symbol'], pos_msg['broker'] + sym, broker = pos_msg.symbol, pos_msg.broker relay.positions.setdefault( # NOTE: translate to a FQSN! @@ -651,6 +646,13 @@ async def translate_and_relay_brokerd_events( else: # check for existing live flow entry entry = book._ems_entries.get(oid) + old_reqid = entry.reqid + + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid} -> {reqid}' + ) # initial response to brokerd order request if name == 'ack': @@ -661,6 +663,10 @@ async def translate_and_relay_brokerd_events( # a ``BrokerdOrderAck`` **must** be sent after an order # request in order to establish this id mapping. book._ems2brokerd_ids[oid] = reqid + log.info( + 'Rx ACK for order\n' + f'oid: {oid} -> reqid: {reqid}' + ) # new order which has not yet be registered into the # local ems book, insert it now and handle 2 cases: @@ -675,7 +681,7 @@ async def translate_and_relay_brokerd_events( entry.reqid = reqid # tell broker to cancel immediately - await brokerd_trades_stream.send(entry.dict()) + await brokerd_trades_stream.send(entry) # - the order is now active and will be mirrored in # our book -> registered as live flow @@ -715,7 +721,7 @@ async def translate_and_relay_brokerd_events( # if 10147 in message: cancel resp = 'broker_errored' - broker_details = msg.dict() + broker_details = msg # don't relay message to order requester client # continue @@ -750,7 +756,7 @@ async def translate_and_relay_brokerd_events( resp = 'broker_' + msg.status # pass the BrokerdStatus msg inside the broker details field - broker_details = msg.dict() + broker_details = msg elif name in ( 'fill', @@ -759,7 +765,7 @@ async def translate_and_relay_brokerd_events( # proxy through the "fill" result(s) resp = 'broker_filled' - broker_details = msg.dict() + broker_details = msg log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') @@ -777,7 +783,7 @@ async def translate_and_relay_brokerd_events( time_ns=time.time_ns(), broker_reqid=reqid, brokerd_msg=broker_details, - ).dict() + ) ) except KeyError: log.error( @@ -849,7 +855,7 @@ async def process_client_order_cmds( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg.dict()) + await brokerd_order_stream.send(msg) else: # this might be a cancel for an order that hasn't been @@ -871,7 +877,7 @@ async def process_client_order_cmds( resp='dark_cancelled', oid=oid, time_ns=time.time_ns(), - ).dict() + ) ) # de-register this client dialogue router.dialogues.pop(oid) @@ -926,7 +932,7 @@ async def process_client_order_cmds( # handle relaying the ems side responses back to # the client/cmd sender from this request log.info(f'Sending live order to {broker}:\n{pformat(msg)}') - await brokerd_order_stream.send(msg.dict()) + await brokerd_order_stream.send(msg) # an immediate response should be ``BrokerdOrderAck`` # with ems order id from the ``trades_dialogue()`` @@ -951,6 +957,12 @@ async def process_client_order_cmds( # like every other shitty tina platform that makes # the user choose the predicate operator. last = dark_book.lasts[fqsn] + + # sometimes the real-time feed hasn't come up + # so just pull from the latest history. + if isnan(last): + last = feed.shm.array[-1]['close'] + pred = mk_check(trigger_price, last, action) spread_slap: float = 5 @@ -1000,7 +1012,7 @@ async def process_client_order_cmds( resp=resp, oid=oid, time_ns=time.time_ns(), - ).dict() + ) ) @@ -1138,8 +1150,14 @@ async def _emsd_main( ) finally: - # remove client from "registry" - _router.clients.remove(ems_client_order_stream) + # try to remove client from "registry" + try: + _router.clients.remove(ems_client_order_stream) + except KeyError: + log.warning( + f'Stream {ems_client_order_stream._ctx.chan.uid}' + ' was already dropped?' + ) dialogues = _router.dialogues diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 4bb0be00d..0d92a5f13 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -20,16 +20,15 @@ """ from typing import Optional, Union -# TODO: try out just encoding/send direction for now? -# import msgspec -from pydantic import BaseModel - from ..data._source import Symbol +from ..data.types import Struct -# Client -> emsd +# -------------- +# Client -> emsd +# -------------- -class Cancel(BaseModel): +class Cancel(Struct): '''Cancel msg for removing a dark (ems triggered) or broker-submitted (live) trigger/order. @@ -39,7 +38,7 @@ class Cancel(BaseModel): symbol: str -class Order(BaseModel): +class Order(Struct): action: str # {'buy', 'sell', 'alert'} # internal ``emdsd`` unique "order id" @@ -59,20 +58,14 @@ class Order(BaseModel): # the backend broker exec_mode: str # {'dark', 'live', 'paper'} - class Config: - # just for pre-loading a ``Symbol`` when used - # in the order mode staging process - arbitrary_types_allowed = True - # don't copy this model instance when used in - # a recursive model - copy_on_model_validation = False +# -------------- # Client <- emsd +# -------------- # update msgs from ems which relay state change info # from the active clearing engine. - -class Status(BaseModel): +class Status(Struct): name: str = 'status' oid: str # uuid4 @@ -95,8 +88,6 @@ class Status(BaseModel): # } resp: str # "response", see above - # symbol: str - # trigger info trigger_price: Optional[float] = None # price: float @@ -111,10 +102,12 @@ class Status(BaseModel): brokerd_msg: dict = {} +# --------------- # emsd -> brokerd +# --------------- # requests *sent* from ems to respective backend broker daemon -class BrokerdCancel(BaseModel): +class BrokerdCancel(Struct): action: str = 'cancel' oid: str # piker emsd order id @@ -130,7 +123,7 @@ class BrokerdCancel(BaseModel): reqid: Optional[Union[int, str]] = None -class BrokerdOrder(BaseModel): +class BrokerdOrder(Struct): action: str # {buy, sell} oid: str @@ -150,11 +143,12 @@ class BrokerdOrder(BaseModel): size: float +# --------------- # emsd <- brokerd +# --------------- # requests *received* to ems from broker backend - -class BrokerdOrderAck(BaseModel): +class BrokerdOrderAck(Struct): ''' Immediate reponse to a brokerd order request providing the broker specific unique order id so that the EMS can associate this @@ -172,7 +166,7 @@ class BrokerdOrderAck(BaseModel): account: str = '' -class BrokerdStatus(BaseModel): +class BrokerdStatus(Struct): name: str = 'status' reqid: Union[int, str] @@ -205,7 +199,7 @@ class BrokerdStatus(BaseModel): } -class BrokerdFill(BaseModel): +class BrokerdFill(Struct): ''' A single message indicating a "fill-details" event from the broker if avaiable. @@ -230,7 +224,7 @@ class BrokerdFill(BaseModel): broker_time: float -class BrokerdError(BaseModel): +class BrokerdError(Struct): ''' Optional error type that can be relayed to emsd for error handling. @@ -249,7 +243,7 @@ class BrokerdError(BaseModel): broker_details: dict = {} -class BrokerdPosition(BaseModel): +class BrokerdPosition(Struct): '''Position update event from brokerd. ''' diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index cf5808767..944f933bc 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -30,8 +30,8 @@ import tractor from dataclasses import dataclass -from .. import data from ..data._source import Symbol +from ..data.feed import open_feed from ..pp import Position from ..data._normalize import iterticks from ..data._source import unpack_fqsn @@ -117,7 +117,7 @@ async def submit_limit( reason='paper_trigger', remaining=size, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) # if we're already a clearing price simulate an immediate fill if ( @@ -173,7 +173,7 @@ async def submit_cancel( broker=self.broker, time_ns=time.time_ns(), ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) async def fake_fill( self, @@ -216,7 +216,7 @@ async def fake_fill( 'name': self.broker + '_paper', }, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) if order_complete: @@ -240,7 +240,7 @@ async def fake_fill( 'name': self.broker, }, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) # lookup any existing position token = f'{symbol}.{self.broker}' @@ -268,7 +268,7 @@ async def fake_fill( ) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) - await self.ems_trades_stream.send(pp_msg.dict()) + await self.ems_trades_stream.send(pp_msg) async def simulate_fills( @@ -384,7 +384,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'Paper only. No account found: `{account}` ?', - ).dict()) + )) continue # validate @@ -416,7 +416,7 @@ async def handle_order_requests( # broker specific request id reqid=reqid, - ).dict() + ) ) elif action == 'cancel': @@ -441,14 +441,11 @@ async def trades_dialogue( ) -> None: tractor.log.get_console_log(loglevel) - async with ( - - data.open_feed( - [fqsn], - loglevel=loglevel, - ) as feed, + async with open_feed( + [fqsn], + loglevel=loglevel, + ) as feed: - ): # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` # await ctx.started(all_positions) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index e98195b46..55dca991b 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -30,19 +30,19 @@ get_shm_token, ShmArray, ) -from .feed import ( - open_feed, - _setup_persistent_brokerd, -) +# from .feed import ( +# # open_feed, +# _setup_persistent_brokerd, +# ) __all__ = [ - 'open_feed', + # 'open_feed', 'ShmArray', 'iterticks', 'maybe_open_shm_array', 'attach_shm_array', 'open_shm_array', 'get_shm_token', - '_setup_persistent_brokerd', + # '_setup_persistent_brokerd', ] diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 98b3b41f9..d04d73a2e 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -37,6 +37,7 @@ from docker.errors import ( DockerException, APIError, + ContainerError, ) from requests.exceptions import ConnectionError, ReadTimeout @@ -50,10 +51,6 @@ class DockerNotStarted(Exception): 'Prolly you dint start da daemon bruh' -class ContainerError(RuntimeError): - 'Error reported via app-container logging level' - - @acm async def open_docker( url: Optional[str] = None, @@ -96,9 +93,9 @@ def unpack_msg(err: Exception) -> str: # not perms? raise - finally: - if client: - client.close() + # finally: + # if client: + # client.close() class Container: @@ -185,6 +182,21 @@ def try_signal( if 'is not running' in err.explanation: return False + def hard_kill(self, start: float) -> None: + delay = time.time() - start + log.error( + f'Failed to kill container {self.cntr.id} after {delay}s\n' + 'sending SIGKILL..' + ) + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + self.cntr.wait( + timeout=3, + condition='not-running', + ) + async def cancel( self, stop_msg: str, @@ -231,21 +243,9 @@ async def cancel( ConnectionError, ): log.exception('Docker connection failure') - break + self.hard_kill(start) else: - delay = time.time() - start - log.error( - f'Failed to kill container {cid} after {delay}s\n' - 'sending SIGKILL..' - ) - # get out the big guns, bc apparently marketstore - # doesn't actually know how to terminate gracefully - # :eyeroll:... - self.try_signal('SIGKILL') - self.cntr.wait( - timeout=3, - condition='not-running', - ) + self.hard_kill(start) log.cancel(f'Container stopped: {cid}') diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 1172fc7b3..8ea51eb75 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -27,13 +27,14 @@ if _USE_POSIX: from _posixshmem import shm_unlink -import tractor +# import msgspec import numpy as np -from pydantic import BaseModel from numpy.lib import recfunctions as rfn +import tractor from ..log import get_logger from ._source import base_iohlc_dtype +from .types import Struct log = get_logger(__name__) @@ -107,15 +108,12 @@ def destroy(self) -> None: log.warning(f'Shm for {name} already unlinked?') -class _Token(BaseModel): +class _Token(Struct, frozen=True): ''' Internal represenation of a shared memory "token" which can be used to key a system wide post shm entry. ''' - class Config: - frozen = True - shm_name: str # this servers as a "key" value shm_first_index_name: str shm_last_index_name: str @@ -126,17 +124,25 @@ def dtype(self) -> np.dtype: return np.dtype(list(map(tuple, self.dtype_descr))).descr def as_msg(self): - return self.dict() + return self.to_dict() @classmethod def from_msg(cls, msg: dict) -> _Token: + + # TODO: native struct decoding + # return _token_dec.decode(msg) + if isinstance(msg, _Token): return msg + # assert 0 + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) return _Token(**msg) +# _token_dec = msgspec.msgpack.Decoder(_Token) + # TODO: this api? # _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ContextStack('_known_tokens', ) @@ -167,7 +173,7 @@ def _make_token( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", - dtype_descr=np.dtype(dtype).descr + dtype_descr=tuple(np.dtype(dtype).descr) ) diff --git a/piker/data/feed.py b/piker/data/feed.py index 66ced72fc..2795535d9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -42,7 +42,6 @@ import trimeter import tractor from tractor.trionics import maybe_open_context -from pydantic import BaseModel import pendulum import numpy as np @@ -59,6 +58,7 @@ ShmArray, ) from .ingest import get_ingestormod +from .types import Struct from ._source import ( base_iohlc_dtype, Symbol, @@ -84,7 +84,7 @@ log = get_logger(__name__) -class _FeedsBus(BaseModel): +class _FeedsBus(Struct): ''' Data feeds broadcaster and persistence management. @@ -100,10 +100,6 @@ class _FeedsBus(BaseModel): a dedicated cancel scope. ''' - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False - brokername: str nursery: trio.Nursery feeds: dict[str, tuple[dict, dict]] = {} diff --git a/piker/data/types.py b/piker/data/types.py new file mode 100644 index 000000000..c6cba61d1 --- /dev/null +++ b/piker/data/types.py @@ -0,0 +1,68 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Built-in (extension) types. + +""" +from typing import Optional +from pprint import pformat + +import msgspec + + +class Struct( + msgspec.Struct, + + # https://jcristharif.com/msgspec/structs.html#tagged-unions + # tag='pikerstruct', + # tag=True, +): + ''' + A "human friendlier" (aka repl buddy) struct subtype. + + ''' + def to_dict(self) -> dict: + return { + f: getattr(self, f) + for f in self.__struct_fields__ + } + + def __repr__(self): + return f'Struct({pformat(self.to_dict())})' + + def copy( + self, + update: Optional[dict] = None, + + ) -> msgspec.Struct: + ''' + Validate-typecast all self defined fields, return a copy of us + with all such fields. + + This is kinda like the default behaviour in `pydantic.BaseModel`. + + ''' + if update: + for k, v in update.items(): + setattr(self, k, v) + + # roundtrip serialize to validate + return msgspec.msgpack.Decoder( + type=type(self) + ).decode( + msgspec.msgpack.Encoder().encode(self) + ) diff --git a/piker/ui/_event.py b/piker/ui/_event.py index f9982843c..9c006dc86 100644 --- a/piker/ui/_event.py +++ b/piker/ui/_event.py @@ -21,7 +21,6 @@ from contextlib import asynccontextmanager, AsyncExitStack from typing import Callable -from pydantic import BaseModel import trio from PyQt5 import QtCore from PyQt5.QtCore import QEvent, pyqtBoundSignal @@ -30,6 +29,8 @@ QGraphicsSceneMouseEvent as gs_mouse, ) +from ..data.types import Struct + MOUSE_EVENTS = { gs_mouse.GraphicsSceneMousePress, @@ -43,13 +44,10 @@ # TODO: maybe consider some constrained ints down the road? # https://pydantic-docs.helpmanual.io/usage/types/#constrained-types -class KeyboardMsg(BaseModel): +class KeyboardMsg(Struct): '''Unpacked Qt keyboard event data. ''' - class Config: - arbitrary_types_allowed = True - event: QEvent etype: int key: int @@ -57,16 +55,13 @@ class Config: txt: str def to_tuple(self) -> tuple: - return tuple(self.dict().values()) + return tuple(self.to_dict().values()) -class MouseMsg(BaseModel): +class MouseMsg(Struct): '''Unpacked Qt keyboard event data. ''' - class Config: - arbitrary_types_allowed = True - event: QEvent etype: int button: int diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index c6d095944..f62363f3e 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -619,7 +619,7 @@ def set_slots( # color: #19232D; # width: 10px; - self.setRange(0, slots) + self.setRange(0, int(slots)) self.setValue(value) diff --git a/piker/ui/_orm.py b/piker/ui/_orm.py index 67050e95e..8dea0b6d5 100644 --- a/piker/ui/_orm.py +++ b/piker/ui/_orm.py @@ -22,12 +22,9 @@ from typing import ( Optional, Generic, TypeVar, Callable, - Literal, ) -import enum -import sys -from pydantic import BaseModel, validator +# from pydantic import BaseModel, validator from pydantic.generics import GenericModel from PyQt5.QtWidgets import ( QWidget, @@ -38,6 +35,7 @@ # FontScaledDelegate, Edit, ) +from ..data.types import Struct DataType = TypeVar('DataType') @@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]): options: dict[str, DataType] # value: DataType = None - @validator('value') # , always=True) + # @validator('value') # , always=True) def set_value_first( cls, @@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]): widget_factory = Edit -class AllocatorPane(BaseModel): +class AllocatorPane(Struct): account = Selection[str]( options=dict.fromkeys( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index e9ed3499a..83a0ed485 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -27,7 +27,6 @@ from typing import Optional, Dict, Callable, Any import uuid -from pydantic import BaseModel import tractor import trio from PyQt5.QtCore import Qt @@ -41,6 +40,7 @@ from ._style import _font from ..data._source import Symbol from ..data.feed import Feed +from ..data.types import Struct from ..log import get_logger from ._editors import LineEditor, ArrowEditor from ._lines import order_line, LevelLine @@ -58,7 +58,7 @@ log = get_logger(__name__) -class OrderDialog(BaseModel): +class OrderDialog(Struct): ''' Trade dialogue meta-data describing the lifetime of an order submission to ``emsd`` from a chart. @@ -73,10 +73,6 @@ class OrderDialog(BaseModel): msgs: dict[str, dict] = {} fills: Dict[str, Any] = {} - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False - def on_level_change_update_next_order_info( @@ -268,7 +264,8 @@ def submit_order( self, ) -> OrderDialog: - '''Send execution order to EMS return a level line to + ''' + Send execution order to EMS return a level line to represent the order on a chart. ''' @@ -277,13 +274,9 @@ def submit_order( oid = str(uuid.uuid4()) # format order data for ems - fqsn = symbol.front_fqsn() - order = staged.copy( - update={ - 'symbol': fqsn, - 'oid': oid, - } - ) + order = staged.copy() + order.oid = oid + order.symbol = symbol.front_fqsn() line = self.line_from_order( order, @@ -858,7 +851,9 @@ async def process_trades_and_update_ui( # delete level line from view mode.on_cancel(oid) broker_msg = msg['brokerd_msg'] - log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}') + log.warning( + f'Order {oid} failed with:\n{pformat(broker_msg)}' + ) elif resp in ( 'dark_triggered'