Skip to content

Commit

Permalink
Avoid crash on trades ledger msgs
Browse files Browse the repository at this point in the history
Just ignore them for now using new `match:` syntax B)
but we'll do incremental update sooon!

Resolves #311
  • Loading branch information
goodboy committed Jul 2, 2022
1 parent 9106d13 commit fcd7e0f
Showing 1 changed file with 62 additions and 57 deletions.
119 changes: 62 additions & 57 deletions piker/brokers/kraken/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Trade(BaseModel):
def pack_positions(
acc: str,
trades: dict

) -> list[Any]:
positions: dict[str, float] = {}
vols: dict[str, float] = {}
Expand Down Expand Up @@ -104,8 +105,8 @@ def pack_positions(

async def handle_order_requests(

client: Client,
ems_order_stream: tractor.MsgStream,
client: Client,
ems_order_stream: tractor.MsgStream,

) -> None:

Expand Down Expand Up @@ -342,11 +343,13 @@ async def subscribe(ws: wsproto.WSConnection, token: str):
# TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)

# pull and deliver trades ledger
acc_name = 'kraken.' + client._name
trades = await client.get_trades()

log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`'
)
position_msgs = pack_positions(acc_name, trades)

await ctx.started((position_msgs, (acc_name,)))

# Get websocket token for authenticated data stream
Expand All @@ -355,74 +358,76 @@ async def subscribe(ws: wsproto.WSConnection, token: str):

# lol wtf is this..
assert resp['error'] == []

token = resp['result']['token']

async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)

# Process trades msg stream of ws
async with open_autorecon_ws(
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
async for msg in process_trade_msgs(ws):
for trade in msg:
# check the type of packaged message
assert type(trade) == Trade
) as ws,
trio.open_nursery() as n,
):
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)

# begin trade event processing
async for msg in process_trade_msgs(ws):
for trade in msg:
match trade:
# prepare and send a filled status update
filled_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),

account='kraken.spot',
status='filled',
filled=float(trade.size),
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
},

# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)

await ems_stream.send(filled_msg.dict())

# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),

action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)

await ems_stream.send(fill_msg.dict())
case Trade():
filled_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),

account=acc_name,
status='filled',
filled=float(trade.size),
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
},

# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())

# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),

action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)

await ems_stream.send(fill_msg.dict())

case _:
log.warning(f'Unhandled trades msg: {trade}')
await tractor.breakpoint()


async def process_trade_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
Parse and pack trades subscription messages, deliver framed
sequences of messages?
'''
sequence_counter = 0
Expand Down

0 comments on commit fcd7e0f

Please sign in to comment.