Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tractor typed msg hackin #354

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ea5004c
First draft, working WS based order management
goodboy Jul 5, 2022
78a78f5
Use `match:` syntax in data feed subs processing
goodboy Jul 5, 2022
d41d140
Make ems relay loop report on brokerd `.reqid` changes
goodboy Jul 5, 2022
204d3b7
Get order "editing" working fully
goodboy Jul 5, 2022
9723d27
Drop uneeded count-sequencec verification
goodboy Jul 5, 2022
4d19c0f
Factor msg loop into new func: `handle_order_updates()`
goodboy Jul 5, 2022
e901547
Factor status handling into a new `process_status()` helper
goodboy Jul 5, 2022
65ff9a1
Update ledger *after* pps updates from new trades
goodboy Jul 5, 2022
ef36c5f
Don't require an ems msg symbol on error statuses
goodboy Jul 5, 2022
2c3307a
Try out a backend readme
goodboy Jul 5, 2022
de1b473
Add ledger and `pps.toml` snippets
goodboy Jul 5, 2022
92d24b7
Use our struct in binance backend
goodboy Jul 6, 2022
7ec95ec
Drop `pydantic` from service mngr
goodboy Jul 6, 2022
da54c80
Hard kill container on both a timeout or connection error
goodboy Jul 6, 2022
4660b57
Use our struct for kraken `Pair` type
goodboy Jul 6, 2022
5f24c57
Use struct for shm tokens
goodboy Jul 6, 2022
2664360
Remove `BaseModel` use from all dataclass-like uses
goodboy Jul 6, 2022
861826d
Lol, gotta `float()` that vlm before `*` XD
goodboy Jul 7, 2022
bf5fcfe
Fix missing container id, drop custom exception
goodboy Jul 7, 2022
4e9ff65
Finally solve the last-price-is-`nan` issue..
goodboy Jul 7, 2022
c10a85a
Add a custom `msgspec.Struct` with some humanizing
goodboy Jul 7, 2022
cfc08a5
Drop pydantic from allocator
goodboy Jul 8, 2022
c87704e
Cast slots to `int` before range set
goodboy Jul 8, 2022
6887d4d
Change all clearing msgs over to `msgspec`
goodboy Jul 8, 2022
583fa79
Add `Struct.copy()` which does a rountrip validate
goodboy Jul 8, 2022
de91c21
Drop remaining `BaseModel` api usage from rest of codebase
goodboy Jul 8, 2022
d31c38e
Mucking with custom `msgspec.Struct` codecs
goodboy Jul 7, 2022
869aa82
Import adjustments to allow msg codec overriding in `tractor`
goodboy Jul 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 43 additions & 39 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
from contextlib import asynccontextmanager as acm
from collections import defaultdict

from pydantic import BaseModel
import tractor
import trio
from trio_typing import TaskStatus
import tractor

from .log import get_logger, get_console_log
from .brokers import get_brokermod
from .data.types import Struct


log = get_logger(__name__)
Expand All @@ -47,16 +47,13 @@
]


class Services(BaseModel):
class Services(Struct):

actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}

class Config:
arbitrary_types_allowed = True

async def start_service_task(
self,
name: str,
Expand Down Expand Up @@ -207,23 +204,26 @@ async def open_piker_runtime(
assert _services is None

# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
with tractor.msg.configure_native_msgs(
[Struct],
):
async with (
tractor.open_root_actor(

# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,

# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()


@acm
Expand Down Expand Up @@ -263,27 +263,31 @@ async def maybe_open_pikerd(
if loglevel:
get_console_log(loglevel)

# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):

async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return

# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(

loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),

) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None


# brokerd enabled modules
Expand Down Expand Up @@ -445,7 +449,7 @@ async def spawn_brokerd(
)

# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
from .data.feed import _setup_persistent_brokerd

await _services.start_service_task(
dname,
Expand Down
18 changes: 11 additions & 7 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto

from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs

log = get_logger(__name__)
Expand Down Expand Up @@ -79,12 +79,14 @@


# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
class Pair(Struct, frozen=True):
symbol: str
status: str

baseAsset: str
baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
Expand Down Expand Up @@ -287,7 +289,7 @@ async def get_client() -> Client:


# validation type
class AggTrade(BaseModel):
class AggTrade(Struct):
e: str # Event type
E: int # Event time
s: str # Symbol
Expand Down Expand Up @@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:

elif msg.get('e') == 'aggTrade':

# validate
# NOTE: this is purely for a definition, ``msgspec.Struct``
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg)

# TODO: type out and require this quote format
Expand All @@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
Expand Down Expand Up @@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()]
syminfo = Pair(**d) # validation

si = sym_infos[sym] = syminfo.dict()
si = sym_infos[sym] = syminfo.to_dict()

# XXX: after manually inspecting the response format we
# just directly pick out the info we need
Expand Down
21 changes: 10 additions & 11 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
))
continue

client = _accounts2clients.get(account)
Expand All @@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
))
continue

if action in {'buy', 'sell'}:
Expand All @@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
))

# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
Expand All @@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
)

elif action == 'cancel':
Expand Down Expand Up @@ -559,7 +558,7 @@ async def open_stream(
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
all_positions.extend(msg for msg in msgs)

if not all_positions and cids2pps:
raise RuntimeError(
Expand Down Expand Up @@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break

await ems_stream.send(msg.dict())
await ems_stream.send(msg)


async def deliver_trade_events(
Expand Down Expand Up @@ -743,7 +742,7 @@ async def deliver_trade_events(

broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)

case 'fill':

Expand Down Expand Up @@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],

)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)

# 2 cases:
# - fill comes first or
Expand Down Expand Up @@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
# await ems_stream.send(msg)

case 'event':

Expand All @@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# log.info(f"TWS triggered trade\n{pformat(msg)}")

# msg.reqid = 'tws-' + str(-1 * reqid)

Expand Down
64 changes: 64 additions & 0 deletions piker/brokers/kraken/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
``kraken`` backend
------------------
though they don't have the most liquidity of all the cexes they sure are
accommodating to those of us who appreciate a little ``xmr``.

status
******
current support is *production grade* and both real-time data and order
management should be correct and fast. this backend is used by core devs
for live trading.


config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:

.. code:: toml

[kraken]
accounts.spot = 'spot'
key_descr = "spot"
api_key = "69696969696969696696969696969696969696969696969696969696"
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"


If everything works correctly you should see any current positions
loaded in the pps pane on chart load and you should also be able to
check your trade records in the file::

<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml


An example ledger file will have entries written verbatim from the
trade events schema:

.. code:: toml

[TFJBKK-SMBZS-VJ4UWS]
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
postxid = "SMBZSE-M7IF5-CFI7LT"
pair = "XXMRZEUR"
time = 1655691993.4133966
type = "buy"
ordertype = "limit"
price = "103.97000000"
cost = "499.99999977"
fee = "0.80000000"
vol = "4.80907954"
margin = "0.00000000"
misc = ""


your ``pps.toml`` file will have position entries like,

.. code:: toml

[kraken.spot."xmreur.kraken"]
size = 4.80907954
be_price = 103.97000000
bsuid = "XXMRZEUR"
clears = [
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
]
1 change: 1 addition & 0 deletions piker/brokers/kraken/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ async def submit_limit(
"volume": str(size),
}
return await self.endpoint('AddOrder', data)

else:
# Edit order data for kraken api
data["txid"] = reqid
Expand Down
Loading