Skip to content

Commit

Permalink
Drop remaining BaseModel api usage from rest of codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Jul 9, 2022
1 parent b44e2d9 commit 2a99f7a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 62 deletions.
21 changes: 10 additions & 11 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
))
continue

client = _accounts2clients.get(account)
Expand All @@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
))
continue

if action in {'buy', 'sell'}:
Expand All @@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
))

# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
Expand All @@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
)

elif action == 'cancel':
Expand Down Expand Up @@ -559,7 +558,7 @@ async def open_stream(
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
all_positions.extend(msg for msg in msgs)

if not all_positions and cids2pps:
raise RuntimeError(
Expand Down Expand Up @@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break

await ems_stream.send(msg.dict())
await ems_stream.send(msg)


async def deliver_trade_events(
Expand Down Expand Up @@ -743,7 +742,7 @@ async def deliver_trade_events(

broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)

case 'fill':

Expand Down Expand Up @@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],

)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)

# 2 cases:
# - fill comes first or
Expand Down Expand Up @@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
# await ems_stream.send(msg)

case 'event':

Expand All @@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# log.info(f"TWS triggered trade\n{pformat(msg)}")

# msg.reqid = 'tws-' + str(-1 * reqid)

Expand Down
22 changes: 11 additions & 11 deletions piker/brokers/kraken/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
BrokerdPosition,
BrokerdStatus,
)
from pikerd.data.types import Struct
from piker.data.types import Struct
from . import log
from .api import (
Client,
Expand Down Expand Up @@ -110,7 +110,7 @@ async def handle_order_requests(
'https://github.com/pikers/piker/issues/299'
),

).dict())
))
continue

# validate
Expand All @@ -136,7 +136,7 @@ async def handle_order_requests(
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
)
else:
# TODO: handle multiple orders (cancels?)
Expand All @@ -161,7 +161,7 @@ async def handle_order_requests(
# account the made the order
account=order.account

).dict()
)
)

elif action == 'cancel':
Expand Down Expand Up @@ -189,7 +189,7 @@ async def handle_order_requests(
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
)

if not error:
Expand Down Expand Up @@ -217,7 +217,7 @@ async def handle_order_requests(
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
)

else: # order cancel success case.
Expand All @@ -230,7 +230,7 @@ async def handle_order_requests(
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
)
else:
log.error(f'Unknown order command: {request_msg}')
Expand Down Expand Up @@ -330,7 +330,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
position_msgs.append(msg)

await ctx.started(
(position_msgs, [acc_name])
Expand Down Expand Up @@ -408,7 +408,7 @@ async def trades_dialogue(
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
await ems_stream.send(fill_msg)

filled_msg = BrokerdStatus(
reqid=reqid,
Expand All @@ -432,7 +432,7 @@ async def trades_dialogue(
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
await ems_stream.send(filled_msg)

# update ledger and position tracking
trans = await update_ledger(acctid, trades)
Expand Down Expand Up @@ -469,7 +469,7 @@ async def trades_dialogue(
# TODO
# currency=''
)
await ems_stream.send(pp_msg.dict())
await ems_stream.send(pp_msg)

case [
trades_msgs,
Expand Down
13 changes: 6 additions & 7 deletions piker/clearing/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ class OrderBook:

def send(
self,
msg: Order,
msg: Order | dict,

) -> dict:
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
return msg

def update(
Expand All @@ -73,9 +73,8 @@ def update(

) -> dict:
cmd = self._sent_orders[uuid]
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = Order(**msg)
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg)
return cmd

Expand All @@ -88,7 +87,7 @@ def cancel(self, uuid: str) -> bool:
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)


_orders: OrderBook = None
Expand Down Expand Up @@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd['symbol'] == symbol_key:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
Expand Down
33 changes: 15 additions & 18 deletions piker/clearing/_ems.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ async def clear_dark_triggers(
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg.dict())
await brokerd_orders_stream.send(msg)

# mark this entry as having sent an order
# request. the entry will be replaced once the
Expand All @@ -247,14 +247,11 @@ async def clear_dark_triggers(

msg = Status(
oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
resp=resp,
trigger_price=price,
broker_details={'name': broker},
cmd=cmd, # original request message

).dict()
brokerd_msg=cmd,
)

# remove exec-condition from set
log.info(f'removing pred for {oid}')
Expand Down Expand Up @@ -577,11 +574,11 @@ async def translate_and_relay_brokerd_events(

if name == 'position':

pos_msg = BrokerdPosition(**brokerd_msg).dict()
pos_msg = BrokerdPosition(**brokerd_msg)

# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker']
sym, broker = pos_msg.symbol, pos_msg.broker

relay.positions.setdefault(
# NOTE: translate to a FQSN!
Expand Down Expand Up @@ -672,7 +669,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid

# tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict())
await brokerd_trades_stream.send(entry)

# - the order is now active and will be mirrored in
# our book -> registered as live flow
Expand Down Expand Up @@ -712,7 +709,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel

resp = 'broker_errored'
broker_details = msg.dict()
broker_details = msg

# don't relay message to order requester client
# continue
Expand Down Expand Up @@ -747,7 +744,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status

# pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict()
broker_details = msg

elif name in (
'fill',
Expand All @@ -756,7 +753,7 @@ async def translate_and_relay_brokerd_events(

# proxy through the "fill" result(s)
resp = 'broker_filled'
broker_details = msg.dict()
broker_details = msg

log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')

Expand All @@ -774,7 +771,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
)
except KeyError:
log.error(
Expand Down Expand Up @@ -846,7 +843,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}'
)

await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)

else:
# this might be a cancel for an order that hasn't been
Expand All @@ -868,7 +865,7 @@ async def process_client_order_cmds(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)
# de-register this client dialogue
router.dialogues.pop(oid)
Expand Down Expand Up @@ -923,7 +920,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)

# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
Expand Down Expand Up @@ -1003,7 +1000,7 @@ async def process_client_order_cmds(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)


Expand Down
Loading

0 comments on commit 2a99f7a

Please sign in to comment.