Skip to content

Commit

Permalink
Don't pop zero pps from table in .dump_active()
Browse files Browse the repository at this point in the history
In order to avoid double transaction adds/updates and too-early-discard
of zero sized pps (like when trades are loaded from a backend broker but
were already added to a ledger or `pps.toml` prior) we now **don't** pop
such `Position` entries from the `.pps` table in order to keep each
position's clears table always in place. This avoids the edge case where
an entry was removed too early (due to zero size) but then duplicate
trade entries that were in that entrie's clears show up from the backend
and are entered into a new entry resulting in an incorrect size in a new
entry..We still only push non-net-zero entries to the `pps.toml`.

More fixes:
- return the updated set of `Positions` from `.lifo_update()`.
- return the full table set from `update_pps()`.
- use `PpTable.update_from_trans()` more throughout.
- always write the `pps.toml` on `open_pps()` exit.
- only return table from `load_pps_from_toml()`.
  • Loading branch information
goodboy committed Jul 21, 2022
1 parent 0ed830a commit c6edb9a
Showing 1 changed file with 53 additions and 42 deletions.
95 changes: 53 additions & 42 deletions piker/pp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
Union,
)

from msgspec import Struct
import pendulum
from pendulum import datetime, now
import tomli
Expand All @@ -45,6 +44,7 @@
from .clearing._messages import BrokerdPosition, Status
from .data._source import Symbol
from .log import get_logger
from .data.types import Struct

log = get_logger(__name__)

Expand Down Expand Up @@ -314,6 +314,8 @@ def update_from_trans(

pps = self.pps

updated: dict[str, Position] = {}

# lifo update all pps from records
for tid, r in trans.items():

Expand Down Expand Up @@ -358,7 +360,9 @@ def update_from_trans(
# track clearing data
pp.update(r)

return pps
updated[r.bsuid] = pp

return updated

def dump_active(
self,
Expand Down Expand Up @@ -393,16 +397,23 @@ def dump_active(
pp.minimize_clears()

if (
# "net-zero" is a "closed" position
pp.size == 0

# drop time-expired positions (normally derivatives)
# time-expired pps (normally derivatives) are "closed"
or (pp.expiry and pp.expiry < now())
):
# if expired the position is closed
# for expired cases
pp.size = 0

# position is already closed aka "net zero"
closed_pp = pp_objs.pop(bsuid, None)
# NOTE: we DO NOT pop the pp here since it can still be
# used to check for duplicate clears that may come in as
# new transaction from some backend API and need to be
# ignored; the closed positions won't be written to the
# ``pps.toml`` since ``pp_entries`` above is what's
# written.
# closed_pp = pp_objs.pop(bsuid, None)
closed_pp = pp_objs.get(bsuid)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp

Expand Down Expand Up @@ -440,7 +451,9 @@ def update_pps(
'''
pps: dict[str, Position] = pps or {}
return PpTable(pps).update_from_trans(records)
table = PpTable(pps)
table.update_from_trans(records)
return table.pps


def load_trans_from_ledger(
Expand Down Expand Up @@ -629,7 +642,7 @@ def load_pps_from_toml(
# does a full refresh of pps from the available ledger.
update_from_ledger: bool = False,

) -> tuple[dict, dict[str, Position]]:
) -> tuple[PpTable, dict[str, str]]:
'''
Load and marshal to objects all pps from either an existing
``pps.toml`` config, or from scratch from a ledger file when
Expand All @@ -646,9 +659,7 @@ def load_pps_from_toml(
brokername,
acctid,
)
# TODO: just call `.update_from_trans()`?
ledger_pp_objs = update_pps(trans)
pp_objs.update(ledger_pp_objs)
table.update_from_trans(trans)

# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
Expand All @@ -662,15 +673,14 @@ def load_pps_from_toml(
acctid,
filter_by=reload_records,
)
ledger_pp_objs = update_pps(trans)
pp_objs.update(ledger_pp_objs)
table.update_from_trans(trans)

if not pp_objs:
if not table.pps:
log.warning(
f'No `pps.toml` values could be loaded {brokername}:{acctid}'
)

return table, table.conf, table.pps
return table, table.conf


@cm
Expand All @@ -687,6 +697,7 @@ def open_pps(
conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {})

pp_objs = {}
table = PpTable(pp_objs, conf=conf)

Expand Down Expand Up @@ -747,32 +758,33 @@ def open_pps(
clears=clears,
)

orig = pp_objs.copy()
# orig = pp_objs.copy()
try:
yield table
finally:
if orig != pp_objs:

# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n')

pp_entries, closed_pp_objs = table.dump_active(brokername)
conf[brokername][acctid] = pp_entries

# TODO: why tf haven't they already done this for inline
# tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[
toml.decoder.InlineTableDict
] = enc.dump_inline_table

config.write(
conf,
'pps',
encoder=enc,
)
# breakpoint()
# if orig != table.pps:

# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n')

pp_entries, closed_pp_objs = table.dump_active(brokername)
conf[brokername][acctid] = pp_entries

# TODO: why tf haven't they already done this for inline
# tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[
toml.decoder.InlineTableDict
] = enc.dump_inline_table

config.write(
conf,
'pps',
encoder=enc,
)


def update_pps_conf(
Expand Down Expand Up @@ -803,10 +815,7 @@ def update_pps_conf(
for tid, r in trade_records.items():
ledger_reload[r.bsuid] = r.fqsn

# this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position]

table, conf, pp_objs = load_pps_from_toml(
table, conf = load_pps_from_toml(
brokername,
acctid,
reload_records=ledger_reload,
Expand All @@ -817,7 +826,9 @@ def update_pps_conf(
if trade_records:
table.update_from_trans(trade_records)

# this maps `.bsuid` values to positions
pp_entries, closed_pp_objs = table.dump_active(brokername)
pp_objs: dict[Union[str, int], Position] = table.pps

conf[brokername][acctid] = pp_entries

Expand Down

0 comments on commit c6edb9a

Please sign in to comment.