Skip to content

Commit

Permalink
Add real-time incremental pp updates
Browse files Browse the repository at this point in the history
Moves to using the new `piker.pp` apis to both store real-time trade
events in a ledger file as well emit position update msgs (which were
not in this backend at all prior) when new orders clear (aka fill).

In terms of outstanding issues,
- solves the pp update part of the bugs reported in #310
- starts a msg case block in prep for #293

Details of rework:
- move the `subscribe()` ws fixture to module level and `partial()` in
  the client token instead of passing it to the instance; in prep for
  removal of the `.token` attr from the `NoBsWs` wrapper.
- drop `make_auth_sub()` since it was too thin and we can just
  do it all succinctly in `subscribe()`
- filter trade update msgs to those not yet stored int the toml ledger
- much better kraken api msg unpacking using new `match:` synax B)

Resolves #311
  • Loading branch information
goodboy committed Jul 3, 2022
1 parent 214f864 commit 7846446
Showing 1 changed file with 149 additions and 90 deletions.
239 changes: 149 additions & 90 deletions piker/brokers/kraken/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
'''
from contextlib import asynccontextmanager as acm
from functools import partial
from itertools import chain
from pprint import pformat
import time
from typing import (
Expand Down Expand Up @@ -234,57 +236,59 @@ async def handle_order_requests(
log.error(f'Unknown order command: {request_msg}')


def make_auth_sub(data: dict[str, Any]) -> dict[str, str]:
@acm
async def subscribe(
ws: wsproto.WSConnection,
token: str,
subs: list[str] = ['ownTrades', 'openOrders'],
):
'''
Create a request subscription packet dict.
## TODO: point to the auth urls
Setup ws api subscriptions:
https://docs.kraken.com/websockets/#message-subscribe
By default we sign up for trade and order update events.
'''
# eg. specific logic for this in kraken's sync client:
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'event': 'subscribe',
'subscription': data,
}


@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:

# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)

@acm
async def subscribe(ws: wsproto.WSConnection, token: str):
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
trades_sub = make_auth_sub(
{'name': 'ownTrades', 'token': token}
)
assert token
for sub in subs:
msg = {
'event': 'subscribe',
'subscription': {
'name': sub,
'token': token,
}
}

# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(trades_sub)
await ws.send_msg(msg)

yield
yield

for sub in subs:
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': ['ownTrades'],
'subscription': [sub],
})

# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# XXX: do we need to ack the unsub?
# await ws.recv_msg()


@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:

# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)

async with get_client() as client:

Expand Down Expand Up @@ -347,16 +351,17 @@ async def subscribe(ws: wsproto.WSConnection, token: str):
ctx.open_stream() as ems_stream,
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
fixture=partial(
subscribe,
token=token,
),
) as ws,
trio.open_nursery() as n,
):
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)

count: int = 0
ledger_txids = {r.tid for r in trans}

# process and relay trades events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
Expand All @@ -367,62 +372,116 @@ async def subscribe(ws: wsproto.WSConnection, token: str):
'ownTrades',
{'sequence': seq},
]:
# ensure that we are only processing new trades
# XXX: do we actually need this orrr?
# ensure that we are only processing new trades?
assert seq > count
count += 1

for entries in trades_msgs:
for tid, msg in entries.items():

if tid in ledger_txids:
log.debug(f'Skipping ledgered {tid}:{msg}')
continue

# yield trade
reqid = msg['ordertxid']
action = msg['type']
price = float(msg['price'])
size = float(msg['vol'])
broker_time = float(msg['time'])

# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),

action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)

await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),

account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},

# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# flatten msgs for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()

# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():

# parse-cast
reqid = trade['ordertxid']
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])

# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),

action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())

filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),

account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},

# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())

# update ledger and position tracking
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)

# emit pp msgs
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',

# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,

# TODO
# currency=''
)
await ems_stream.send(pp_msg.dict())

case [
trades_msgs,
'openOrders',
{'sequence': seq},
]:
# TODO: async order update handling which we
# should remove from `handle_order_requests()`
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Order update {seq}:{trades_msgs}')

case _:
log.warning(f'Unhandled trades msg: {msg}')
Expand Down Expand Up @@ -452,7 +511,7 @@ def norm_trade_records(
size=float(size),
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(record['time']),
dt=pendulum.from_timestamp(float(record['time'])),
bsuid=bsuid,

# XXX: there are no derivs on kraken right?
Expand Down

0 comments on commit 7846446

Please sign in to comment.