Skip to content

Commit

Permalink
kraken: use new OrderDialogs type, handle .spot
Browse files Browse the repository at this point in the history
Drop the older `dict[str, ChainMap]` prototype we had since the new
`OrderDialogs` built-out while adding `binance` order support is more
refined and general. Also, handle new and now expect `.spot` venue token
in FQMEs since kraken too has futes markets that we'll likely want to
support eventually.
  • Loading branch information
goodboy committed Jun 20, 2023
1 parent fbc15b9 commit caae84f
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions piker/brokers/kraken/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
Order api and machinery
'''
from collections import ChainMap, defaultdict
from contextlib import (
asynccontextmanager as acm,
aclosing,
Expand Down Expand Up @@ -52,6 +51,9 @@
from piker.accounting._mktinfo import (
MktPair,
)
from piker.clearing import(
OrderDialogs,
)
from piker.clearing._messages import (
Order,
Status,
Expand Down Expand Up @@ -124,7 +126,7 @@ async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
token: str,
apiflows: dict[int, ChainMap[dict[str, dict]]],
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: dict[int, str],

Expand Down Expand Up @@ -188,6 +190,7 @@ async def handle_order_requests(
try:
txid: str = reqids2txids[reqid]
except KeyError:

# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
Expand Down Expand Up @@ -221,7 +224,11 @@ async def handle_order_requests(
'type': order.action,
}

psym: str = order.symbol.upper()
# XXX strip any .<venue> token which should
# ONLY ever be '.spot' rn, until we support
# futes.
bs_fqme: str = order.symbol.rstrip('.spot')
psym: str = bs_fqme.upper()
pair: str = f'{psym[:3]}/{psym[3:]}'

# XXX: ACK the request **immediately** before sending
Expand Down Expand Up @@ -260,7 +267,7 @@ async def handle_order_requests(
await ws.send_msg(req)

# placehold for sanity checking in relay loop
apiflows[reqid].maps.append(msg)
apiflows.add_msg(reqid, msg)

case _:
account = msg.get('account')
Expand Down Expand Up @@ -440,10 +447,7 @@ async def open_trade_dialog(
acc_name = 'kraken.' + acctid

# task local msg dialog tracking
apiflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
apiflows = OrderDialogs()

# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
Expand Down Expand Up @@ -706,7 +710,7 @@ async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
apiflows: dict[int, ChainMap[dict[str, dict]]],
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: bidict[int, str],
table: PpTable,
Expand Down Expand Up @@ -921,7 +925,7 @@ async def handle_order_updates(
),
src='kraken',
)
apiflows[reqid].maps.append(status_msg.to_dict())
apiflows.add_msg(reqid, status_msg.to_dict())
await ems_stream.send(status_msg)
continue

Expand Down Expand Up @@ -1057,7 +1061,7 @@ async def handle_order_updates(
),
)

apiflows[reqid].maps.append(update_msg)
apiflows.add_msg(reqid, update_msg)
await ems_stream.send(resp)

# fill msg.
Expand Down Expand Up @@ -1136,9 +1140,8 @@ async def handle_order_updates(
)
continue

# update the msg chain
chain = apiflows[reqid]
chain.maps.append(event)
# update the msg history
apiflows.add_msg(reqid, event)

if status == 'error':
# any of ``{'add', 'edit', 'cancel'}``
Expand All @@ -1148,11 +1151,16 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)

symbol: str = 'N/A'
if chain := apiflows.get(reqid):
symbol: str = chain.get('symbol', 'N/A')

await ems_stream.send(BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=chain.get('symbol', 'N/A'),
symbol=symbol,

reason=f'Failed {action}:\n{errmsg}',
broker_details=event
Expand Down

0 comments on commit caae84f

Please sign in to comment.