diff --git a/piker/_daemon.py b/piker/_daemon.py
index 82dc848a0..836ce60ce 100644
--- a/piker/_daemon.py
+++ b/piker/_daemon.py
@@ -22,10 +22,10 @@
from contextlib import asynccontextmanager as acm
from collections import defaultdict
-from pydantic import BaseModel
+from msgspec import Struct
+import tractor
import trio
from trio_typing import TaskStatus
-import tractor
from .log import get_logger, get_console_log
from .brokers import get_brokermod
@@ -47,16 +47,13 @@
]
-class Services(BaseModel):
+class Services(Struct):
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
- class Config:
- arbitrary_types_allowed = True
-
async def start_service_task(
self,
name: str,
diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py
index e528172ec..add23b185 100644
--- a/piker/brokers/binance.py
+++ b/piker/brokers/binance.py
@@ -34,13 +34,13 @@
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
-from pydantic import BaseModel
import wsproto
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
+from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__)
@@ -79,12 +79,14 @@
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
-class Pair(BaseModel):
+class Pair(Struct, frozen=True):
symbol: str
status: str
baseAsset: str
baseAssetPrecision: int
+ cancelReplaceAllowed: bool
+ allowTrailingStop: bool
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
@@ -287,7 +289,7 @@ async def get_client() -> Client:
# validation type
-class AggTrade(BaseModel):
+class AggTrade(Struct):
e: str # Event type
E: int # Event time
s: str # Symbol
@@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
elif msg.get('e') == 'aggTrade':
- # validate
+ # NOTE: this is purely for a definition, ``msgspec.Struct``
+ # does not runtime-validate until you decode/encode.
+ # see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg)
# TODO: type out and require this quote format
@@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
- 'price': msg.p,
- 'size': msg.q,
+ 'price': float(msg.p),
+ 'size': float(msg.q),
'broker_ts': msg.T,
}],
}
@@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()]
syminfo = Pair(**d) # validation
- si = sym_infos[sym] = syminfo.dict()
+ si = sym_infos[sym] = syminfo.to_dict()
# XXX: after manually inspecting the response format we
# just directly pick out the info we need
diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py
index b768c9a1b..4737d3768 100644
--- a/piker/brokers/ib/broker.py
+++ b/piker/brokers/ib/broker.py
@@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
- ).dict())
+ ))
continue
client = _accounts2clients.get(account)
@@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
- ).dict())
+ ))
continue
if action in {'buy', 'sell'}:
@@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
- ).dict())
+ ))
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
- time_ns=time.time_ns(),
account=account,
- ).dict()
+ )
)
elif action == 'cancel':
@@ -559,7 +558,7 @@ async def open_stream(
cids2pps,
validate=True,
)
- all_positions.extend(msg.dict() for msg in msgs)
+ all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
@@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break
- await ems_stream.send(msg.dict())
+ await ems_stream.send(msg)
async def deliver_trade_events(
@@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'},
)
- await ems_stream.send(msg.dict())
+ await ems_stream.send(msg)
case 'fill':
@@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],
)
- await ems_stream.send(msg.dict())
+ await ems_stream.send(msg)
# 2 cases:
# - fill comes first or
@@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
- # await ems_stream.send(msg.dict())
+ # await ems_stream.send(msg)
case 'event':
@@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
- # log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
+ # log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid)
diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py
index 4e2e02f64..588a09248 100644
--- a/piker/brokers/kraken/broker.py
+++ b/piker/brokers/kraken/broker.py
@@ -32,7 +32,6 @@
)
import pendulum
-from pydantic import BaseModel
import trio
import tractor
import wsproto
@@ -47,6 +46,7 @@
BrokerdPosition,
BrokerdStatus,
)
+from piker.data.types import Struct
from . import log
from .api import (
Client,
@@ -62,7 +62,7 @@
)
-class Trade(BaseModel):
+class Trade(Struct):
'''
Trade class that helps parse and validate ownTrades stream
@@ -110,7 +110,7 @@ async def handle_order_requests(
'https://github.com/pikers/piker/issues/299'
),
- ).dict())
+ ))
continue
# validate
@@ -136,7 +136,7 @@ async def handle_order_requests(
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
- ).dict()
+ )
)
else:
# TODO: handle multiple orders (cancels?)
@@ -161,7 +161,7 @@ async def handle_order_requests(
# account the made the order
account=order.account
- ).dict()
+ )
)
elif action == 'cancel':
@@ -189,7 +189,7 @@ async def handle_order_requests(
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
- ).dict()
+ )
)
if not error:
@@ -217,7 +217,7 @@ async def handle_order_requests(
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
- ).dict()
+ )
)
else: # order cancel success case.
@@ -230,7 +230,7 @@ async def handle_order_requests(
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
- ).dict()
+ )
)
else:
log.error(f'Unknown order command: {request_msg}')
@@ -330,7 +330,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
- position_msgs.append(msg.dict())
+ position_msgs.append(msg)
await ctx.started(
(position_msgs, [acc_name])
@@ -408,7 +408,7 @@ async def trades_dialogue(
broker_details={'name': 'kraken'},
broker_time=broker_time
)
- await ems_stream.send(fill_msg.dict())
+ await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus(
reqid=reqid,
@@ -432,7 +432,7 @@ async def trades_dialogue(
# https://github.com/pikers/piker/issues/296
remaining=0,
)
- await ems_stream.send(filled_msg.dict())
+ await ems_stream.send(filled_msg)
# update ledger and position tracking
trans = await update_ledger(acctid, trades)
@@ -469,7 +469,7 @@ async def trades_dialogue(
# TODO
# currency=''
)
- await ems_stream.send(pp_msg.dict())
+ await ems_stream.send(pp_msg)
case [
trades_msgs,
diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py
index 71b750825..e52f49aaf 100644
--- a/piker/brokers/kraken/feed.py
+++ b/piker/brokers/kraken/feed.py
@@ -31,7 +31,6 @@
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
-from pydantic import BaseModel
from trio_typing import TaskStatus
import tractor
import trio
@@ -45,6 +44,7 @@
)
from piker.log import get_console_log
from piker.data import ShmArray
+from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
@@ -54,7 +54,7 @@
# https://www.kraken.com/features/api#get-tradable-pairs
-class Pair(BaseModel):
+class Pair(Struct):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
@@ -316,7 +316,7 @@ async def stream_quotes(
sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation
- syminfo = si.dict()
+ syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py
index 336a9b25f..8e51ed16d 100644
--- a/piker/clearing/_allocate.py
+++ b/piker/clearing/_allocate.py
@@ -22,10 +22,9 @@
from typing import Optional
from bidict import bidict
-from pydantic import BaseModel, validator
-# from msgspec import Struct
from ..data._source import Symbol
+from ..data.types import Struct
from ..pp import Position
@@ -41,33 +40,30 @@
)
-class Allocator(BaseModel):
-
- class Config:
- validate_assignment = True
- copy_on_model_validation = False
- arbitrary_types_allowed = True
-
- # required to get the account validator lookup working?
- extra = 'allow'
- underscore_attrs_are_private = False
+class Allocator(Struct):
symbol: Symbol
account: Optional[str] = 'paper'
+
+ _size_units: bidict[str, Optional[str]] = _size_units
+
# TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage.
- size_unit: str = 'currency'
- _size_units: dict[str, Optional[str]] = _size_units
+ _size_unit: str = 'currency'
- @validator('size_unit', pre=True)
- def maybe_lookup_key(cls, v):
- # apply the corresponding enum key for the text "description" value
+ @property
+ def size_unit(self) -> str:
+ return self._size_unit
+
+ @size_unit.setter
+ def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units:
- return _size_units.inverse[v]
+ v = _size_units.inverse[v]
assert v in _size_units
+ self._size_unit = v
return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion
@@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
- 'size_unit': 'currency',
+ # 'size_unit': 'currency',
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0
alloc.units_limit = alloc.slots
+ else:
+ alloc.size_unit = 'currency'
+
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py
index 837c28bce..91cb94fac 100644
--- a/piker/clearing/_client.py
+++ b/piker/clearing/_client.py
@@ -58,11 +58,11 @@ class OrderBook:
def send(
self,
- msg: Order,
+ msg: Order | dict,
) -> dict:
self._sent_orders[msg.oid] = msg
- self._to_ems.send_nowait(msg.dict())
+ self._to_ems.send_nowait(msg)
return msg
def update(
@@ -73,9 +73,8 @@ def update(
) -> dict:
cmd = self._sent_orders[uuid]
- msg = cmd.dict()
- msg.update(data)
- self._sent_orders[uuid] = Order(**msg)
+ msg = cmd.copy(update=data)
+ self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg)
return cmd
@@ -88,7 +87,7 @@ def cancel(self, uuid: str) -> bool:
oid=uuid,
symbol=cmd.symbol,
)
- self._to_ems.send_nowait(msg.dict())
+ self._to_ems.send_nowait(msg)
_orders: OrderBook = None
@@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
- if cmd['symbol'] == symbol_key:
+ if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py
index 051ad30e1..2b9f50cd0 100644
--- a/piker/clearing/_ems.py
+++ b/piker/clearing/_ems.py
@@ -26,7 +26,6 @@
from typing import AsyncIterator, Callable
from bidict import bidict
-from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
@@ -34,6 +33,7 @@
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed
+from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
@@ -231,7 +231,7 @@ async def clear_dark_triggers(
price=submit_price,
size=cmd['size'],
)
- await brokerd_orders_stream.send(msg.dict())
+ await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
@@ -247,14 +247,11 @@ async def clear_dark_triggers(
msg = Status(
oid=oid, # ems order id
- resp=resp,
time_ns=time.time_ns(),
- symbol=fqsn,
+ resp=resp,
trigger_price=price,
- broker_details={'name': broker},
- cmd=cmd, # original request message
-
- ).dict()
+ brokerd_msg=cmd,
+ )
# remove exec-condition from set
log.info(f'removing pred for {oid}')
@@ -303,7 +300,7 @@ class TradesRelay:
consumers: int = 0
-class Router(BaseModel):
+class Router(Struct):
'''
Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
@@ -324,10 +321,6 @@ class Router(BaseModel):
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
- class Config:
- arbitrary_types_allowed = True
- underscore_attrs_are_private = False
-
def get_dark_book(
self,
brokername: str,
@@ -581,11 +574,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
- pos_msg = BrokerdPosition(**brokerd_msg).dict()
+ pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
- sym, broker = pos_msg['symbol'], pos_msg['broker']
+ sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault(
# NOTE: translate to a FQSN!
@@ -676,7 +669,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid
# tell broker to cancel immediately
- await brokerd_trades_stream.send(entry.dict())
+ await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow
@@ -716,7 +709,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel
resp = 'broker_errored'
- broker_details = msg.dict()
+ broker_details = msg
# don't relay message to order requester client
# continue
@@ -751,7 +744,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field
- broker_details = msg.dict()
+ broker_details = msg
elif name in (
'fill',
@@ -760,7 +753,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s)
resp = 'broker_filled'
- broker_details = msg.dict()
+ broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@@ -778,7 +771,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
- ).dict()
+ )
)
except KeyError:
log.error(
@@ -843,14 +836,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages
# from corresponding broker
- if reqid:
+ if reqid is not None:
# send cancel to brokerd immediately!
log.info(
f'Submitting cancel for live order {reqid}'
)
- await brokerd_order_stream.send(msg.dict())
+ await brokerd_order_stream.send(msg)
else:
# this might be a cancel for an order that hasn't been
@@ -872,7 +865,7 @@ async def process_client_order_cmds(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
- ).dict()
+ )
)
# de-register this client dialogue
router.dialogues.pop(oid)
@@ -927,7 +920,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
- await brokerd_order_stream.send(msg.dict())
+ await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
@@ -1007,7 +1000,7 @@ async def process_client_order_cmds(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
- ).dict()
+ )
)
diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py
index 4bb0be00d..e5813c787 100644
--- a/piker/clearing/_messages.py
+++ b/piker/clearing/_messages.py
@@ -1,5 +1,5 @@
# piker: trading gear for hackers
-# Copyright (C) Tyler Goodlet (in stewardship for piker0)
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -15,21 +15,26 @@
# along with this program. If not, see .
"""
-Clearing system messagingn types and protocols.
+Clearing sub-system message and protocols.
"""
from typing import Optional, Union
-# TODO: try out just encoding/send direction for now?
-# import msgspec
-from pydantic import BaseModel
-
from ..data._source import Symbol
+from ..data.types import Struct
-# Client -> emsd
+# TODO: ``msgspec`` stuff worth paying attention to:
+# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
+# - use literals for a common msg determined by diff keys?
+# - https://jcristharif.com/msgspec/usage.html#literal
+# - for eg. ``BrokerdStatus``, instead just have separate messages?
+
+# --------------
+# Client -> emsd
+# --------------
-class Cancel(BaseModel):
+class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
@@ -39,8 +44,10 @@ class Cancel(BaseModel):
symbol: str
-class Order(BaseModel):
+class Order(Struct):
+ # TODO: use ``msgspec.Literal``
+ # https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
@@ -48,6 +55,9 @@ class Order(BaseModel):
account: str # should we set a default as '' ?
price: float
+ # TODO: could we drop the ``.action`` field above and instead just
+ # use +/- values here? Would make the msg smaller at the sake of a
+ # teensie fp precision?
size: float
brokers: list[str]
@@ -59,20 +69,14 @@ class Order(BaseModel):
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
- class Config:
- # just for pre-loading a ``Symbol`` when used
- # in the order mode staging process
- arbitrary_types_allowed = True
- # don't copy this model instance when used in
- # a recursive model
- copy_on_model_validation = False
+# --------------
# Client <- emsd
+# --------------
# update msgs from ems which relay state change info
# from the active clearing engine.
-
-class Status(BaseModel):
+class Status(Struct):
name: str = 'status'
oid: str # uuid4
@@ -95,8 +99,6 @@ class Status(BaseModel):
# }
resp: str # "response", see above
- # symbol: str
-
# trigger info
trigger_price: Optional[float] = None
# price: float
@@ -111,10 +113,12 @@ class Status(BaseModel):
brokerd_msg: dict = {}
+# ---------------
# emsd -> brokerd
+# ---------------
# requests *sent* from ems to respective backend broker daemon
-class BrokerdCancel(BaseModel):
+class BrokerdCancel(Struct):
action: str = 'cancel'
oid: str # piker emsd order id
@@ -130,7 +134,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None
-class BrokerdOrder(BaseModel):
+class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
@@ -150,11 +154,12 @@ class BrokerdOrder(BaseModel):
size: float
+# ---------------
# emsd <- brokerd
+# ---------------
# requests *received* to ems from broker backend
-
-class BrokerdOrderAck(BaseModel):
+class BrokerdOrderAck(Struct):
'''
Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this
@@ -172,7 +177,7 @@ class BrokerdOrderAck(BaseModel):
account: str = ''
-class BrokerdStatus(BaseModel):
+class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
@@ -205,7 +210,7 @@ class BrokerdStatus(BaseModel):
}
-class BrokerdFill(BaseModel):
+class BrokerdFill(Struct):
'''
A single message indicating a "fill-details" event from the broker
if avaiable.
@@ -230,7 +235,7 @@ class BrokerdFill(BaseModel):
broker_time: float
-class BrokerdError(BaseModel):
+class BrokerdError(Struct):
'''
Optional error type that can be relayed to emsd for error handling.
@@ -249,7 +254,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {}
-class BrokerdPosition(BaseModel):
+class BrokerdPosition(Struct):
'''Position update event from brokerd.
'''
diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py
index cf5808767..802dcf469 100644
--- a/piker/clearing/_paper_engine.py
+++ b/piker/clearing/_paper_engine.py
@@ -117,7 +117,7 @@ async def submit_limit(
reason='paper_trigger',
remaining=size,
)
- await self.ems_trades_stream.send(msg.dict())
+ await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill
if (
@@ -173,7 +173,7 @@ async def submit_cancel(
broker=self.broker,
time_ns=time.time_ns(),
)
- await self.ems_trades_stream.send(msg.dict())
+ await self.ems_trades_stream.send(msg)
async def fake_fill(
self,
@@ -216,7 +216,7 @@ async def fake_fill(
'name': self.broker + '_paper',
},
)
- await self.ems_trades_stream.send(msg.dict())
+ await self.ems_trades_stream.send(msg)
if order_complete:
@@ -240,7 +240,7 @@ async def fake_fill(
'name': self.broker,
},
)
- await self.ems_trades_stream.send(msg.dict())
+ await self.ems_trades_stream.send(msg)
# lookup any existing position
token = f'{symbol}.{self.broker}'
@@ -268,7 +268,7 @@ async def fake_fill(
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
- await self.ems_trades_stream.send(pp_msg.dict())
+ await self.ems_trades_stream.send(pp_msg)
async def simulate_fills(
@@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?',
- ).dict())
+ ))
continue
# validate
@@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id
reqid=reqid,
- ).dict()
+ )
)
elif action == 'cancel':
diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py
index 1172fc7b3..82f61e79f 100644
--- a/piker/data/_sharedmem.py
+++ b/piker/data/_sharedmem.py
@@ -27,13 +27,14 @@
if _USE_POSIX:
from _posixshmem import shm_unlink
-import tractor
+# import msgspec
import numpy as np
-from pydantic import BaseModel
from numpy.lib import recfunctions as rfn
+import tractor
from ..log import get_logger
from ._source import base_iohlc_dtype
+from .types import Struct
log = get_logger(__name__)
@@ -107,15 +108,12 @@ def destroy(self) -> None:
log.warning(f'Shm for {name} already unlinked?')
-class _Token(BaseModel):
+class _Token(Struct, frozen=True):
'''
Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry.
'''
- class Config:
- frozen = True
-
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
@@ -126,17 +124,22 @@ def dtype(self) -> np.dtype:
return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self):
- return self.dict()
+ return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token):
return msg
+ # TODO: native struct decoding
+ # return _token_dec.decode(msg)
+
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
+# _token_dec = msgspec.msgpack.Decoder(_Token)
+
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
@@ -167,7 +170,7 @@ def _make_token(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
- dtype_descr=np.dtype(dtype).descr
+ dtype_descr=tuple(np.dtype(dtype).descr)
)
diff --git a/piker/data/feed.py b/piker/data/feed.py
index 66ced72fc..2795535d9 100644
--- a/piker/data/feed.py
+++ b/piker/data/feed.py
@@ -42,7 +42,6 @@
import trimeter
import tractor
from tractor.trionics import maybe_open_context
-from pydantic import BaseModel
import pendulum
import numpy as np
@@ -59,6 +58,7 @@
ShmArray,
)
from .ingest import get_ingestormod
+from .types import Struct
from ._source import (
base_iohlc_dtype,
Symbol,
@@ -84,7 +84,7 @@
log = get_logger(__name__)
-class _FeedsBus(BaseModel):
+class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
@@ -100,10 +100,6 @@ class _FeedsBus(BaseModel):
a dedicated cancel scope.
'''
- class Config:
- arbitrary_types_allowed = True
- underscore_attrs_are_private = False
-
brokername: str
nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {}
diff --git a/piker/data/types.py b/piker/data/types.py
new file mode 100644
index 000000000..c6cba61d1
--- /dev/null
+++ b/piker/data/types.py
@@ -0,0 +1,68 @@
+# piker: trading gear for hackers
+# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+"""
+Built-in (extension) types.
+
+"""
+from typing import Optional
+from pprint import pformat
+
+import msgspec
+
+
+class Struct(
+ msgspec.Struct,
+
+ # https://jcristharif.com/msgspec/structs.html#tagged-unions
+ # tag='pikerstruct',
+ # tag=True,
+):
+ '''
+ A "human friendlier" (aka repl buddy) struct subtype.
+
+ '''
+ def to_dict(self) -> dict:
+ return {
+ f: getattr(self, f)
+ for f in self.__struct_fields__
+ }
+
+ def __repr__(self):
+ return f'Struct({pformat(self.to_dict())})'
+
+ def copy(
+ self,
+ update: Optional[dict] = None,
+
+ ) -> msgspec.Struct:
+ '''
+ Validate-typecast all self defined fields, return a copy of us
+ with all such fields.
+
+ This is kinda like the default behaviour in `pydantic.BaseModel`.
+
+ '''
+ if update:
+ for k, v in update.items():
+ setattr(self, k, v)
+
+ # roundtrip serialize to validate
+ return msgspec.msgpack.Decoder(
+ type=type(self)
+ ).decode(
+ msgspec.msgpack.Encoder().encode(self)
+ )
diff --git a/piker/ui/_event.py b/piker/ui/_event.py
index f9982843c..9c006dc86 100644
--- a/piker/ui/_event.py
+++ b/piker/ui/_event.py
@@ -21,7 +21,6 @@
from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable
-from pydantic import BaseModel
import trio
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal
@@ -30,6 +29,8 @@
QGraphicsSceneMouseEvent as gs_mouse,
)
+from ..data.types import Struct
+
MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress,
@@ -43,13 +44,10 @@
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
-class KeyboardMsg(BaseModel):
+class KeyboardMsg(Struct):
'''Unpacked Qt keyboard event data.
'''
- class Config:
- arbitrary_types_allowed = True
-
event: QEvent
etype: int
key: int
@@ -57,16 +55,13 @@ class Config:
txt: str
def to_tuple(self) -> tuple:
- return tuple(self.dict().values())
+ return tuple(self.to_dict().values())
-class MouseMsg(BaseModel):
+class MouseMsg(Struct):
'''Unpacked Qt keyboard event data.
'''
- class Config:
- arbitrary_types_allowed = True
-
event: QEvent
etype: int
button: int
diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py
index c6d095944..f62363f3e 100644
--- a/piker/ui/_forms.py
+++ b/piker/ui/_forms.py
@@ -619,7 +619,7 @@ def set_slots(
# color: #19232D;
# width: 10px;
- self.setRange(0, slots)
+ self.setRange(0, int(slots))
self.setValue(value)
diff --git a/piker/ui/_orm.py b/piker/ui/_orm.py
index 67050e95e..8dea0b6d5 100644
--- a/piker/ui/_orm.py
+++ b/piker/ui/_orm.py
@@ -22,12 +22,9 @@
from typing import (
Optional, Generic,
TypeVar, Callable,
- Literal,
)
-import enum
-import sys
-from pydantic import BaseModel, validator
+# from pydantic import BaseModel, validator
from pydantic.generics import GenericModel
from PyQt5.QtWidgets import (
QWidget,
@@ -38,6 +35,7 @@
# FontScaledDelegate,
Edit,
)
+from ..data.types import Struct
DataType = TypeVar('DataType')
@@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]):
options: dict[str, DataType]
# value: DataType = None
- @validator('value') # , always=True)
+ # @validator('value') # , always=True)
def set_value_first(
cls,
@@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]):
widget_factory = Edit
-class AllocatorPane(BaseModel):
+class AllocatorPane(Struct):
account = Selection[str](
options=dict.fromkeys(
diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py
index e9ed3499a..83a0ed485 100644
--- a/piker/ui/order_mode.py
+++ b/piker/ui/order_mode.py
@@ -27,7 +27,6 @@
from typing import Optional, Dict, Callable, Any
import uuid
-from pydantic import BaseModel
import tractor
import trio
from PyQt5.QtCore import Qt
@@ -41,6 +40,7 @@
from ._style import _font
from ..data._source import Symbol
from ..data.feed import Feed
+from ..data.types import Struct
from ..log import get_logger
from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine
@@ -58,7 +58,7 @@
log = get_logger(__name__)
-class OrderDialog(BaseModel):
+class OrderDialog(Struct):
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
@@ -73,10 +73,6 @@ class OrderDialog(BaseModel):
msgs: dict[str, dict] = {}
fills: Dict[str, Any] = {}
- class Config:
- arbitrary_types_allowed = True
- underscore_attrs_are_private = False
-
def on_level_change_update_next_order_info(
@@ -268,7 +264,8 @@ def submit_order(
self,
) -> OrderDialog:
- '''Send execution order to EMS return a level line to
+ '''
+ Send execution order to EMS return a level line to
represent the order on a chart.
'''
@@ -277,13 +274,9 @@ def submit_order(
oid = str(uuid.uuid4())
# format order data for ems
- fqsn = symbol.front_fqsn()
- order = staged.copy(
- update={
- 'symbol': fqsn,
- 'oid': oid,
- }
- )
+ order = staged.copy()
+ order.oid = oid
+ order.symbol = symbol.front_fqsn()
line = self.line_from_order(
order,
@@ -858,7 +851,9 @@ async def process_trades_and_update_ui(
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
- log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}')
+ log.warning(
+ f'Order {oid} failed with:\n{pformat(broker_msg)}'
+ )
elif resp in (
'dark_triggered'
diff --git a/setup.py b/setup.py
index 9ae98dbb9..71cc078b1 100755
--- a/setup.py
+++ b/setup.py
@@ -47,12 +47,11 @@
'attrs',
'pygments',
'colorama', # numba traceback coloring
- 'pydantic', # structured data
+ 'msgspec', # performant IPC messaging and structs
# async
'trio',
'trio-websocket',
- 'msgspec', # performant IPC messaging
'async_generator',
# from github currently (see requirements.txt)