Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agg feedz #414

Merged
merged 49 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4000034
Only log pos size errors for `ib`
goodboy Nov 7, 2022
508de61
Drop duplicate live gateway from compose file for now
goodboy Nov 7, 2022
344a634
Always set fqsn in `Feed.symbols: dict`
goodboy Nov 7, 2022
2a9042b
Make all UI entrypoints accept an fqsn `list`
goodboy Nov 7, 2022
8e85ed9
Use new `GodWidget.load_symbols()` from search
goodboy Nov 7, 2022
051a872
EMS: expect fqsn key in `Feed.symbols`
goodboy Nov 7, 2022
1bf1965
Drop `tractor.log` level override fixture
goodboy Nov 7, 2022
c7d5db5
Start data feed layer test suite
goodboy Nov 7, 2022
5bf3cb8
Just warn on `ib` symbol search lags
goodboy Nov 8, 2022
18dc8b0
First draft aggregate feedz support
goodboy Nov 8, 2022
e7de540
Add `Symbol.fqsn: str` property
goodboy Nov 9, 2022
32b36aa
Expect init startup quotes from each symbol
goodboy Nov 9, 2022
25bfe6f
Use new |-union style type annots in sampling routines
goodboy Nov 9, 2022
bb6452b
Further feed syncing fixes wrt to `Flumes`
goodboy Nov 9, 2022
7daab63
Make `Symbol` derive from internal `.types.Struct`
goodboy Nov 10, 2022
2c4daf0
Adjust to per-fqsn-oriented `Flume` lookups throughout
goodboy Nov 10, 2022
8a01c9e
Fix broker-tail stripping using `str.removesuffix()`
goodboy Nov 10, 2022
29b6b3e
Port `storesh` cli-cmd machinery to `Flume` apis
goodboy Nov 10, 2022
36868bb
Add `kraken` test, ensure single broker-provider for now
goodboy Nov 10, 2022
8476d8d
Fix partial-frame-missing backfill logic
goodboy Nov 10, 2022
d6fb6fe
Just drop the pretty repr from our struct for now
goodboy Nov 11, 2022
81516c5
Finally fix tsdb -> shm backfill loading
goodboy Nov 11, 2022
20a3962
`Storage.read_ohlcv()` now returns a `numpy` array
goodboy Nov 11, 2022
7b9db86
Multi-`broker` quotes with `Feed.open_multi_stream()`
goodboy Nov 11, 2022
7bbe86d
Unpack broker mod and portal from fqsn for brokerd-trade-dialogs
goodboy Nov 11, 2022
e348968
Add multi-broker streaming test using both `binance` and `kraken`
goodboy Nov 11, 2022
0a959c1
Not all accounts will have API trade transactions this session..
goodboy Nov 11, 2022
ddbba76
Use (a new) `piker_pin` branch in `tractor` (again)
goodboy Nov 11, 2022
79fcbcc
Add an sdist job to CI
goodboy Nov 11, 2022
c088963
Always touch config file dir if dne
goodboy Nov 11, 2022
1e96ca3
Move `maybe_open_feed()` above for readability
goodboy Nov 12, 2022
f5cd63a
Ensure correct stream is set on each `Flume`
goodboy Nov 14, 2022
326f153
Catch overruns on throttled feed subs too
goodboy Nov 14, 2022
88870fd
Set `brokers: list[st]` from mods when not provided..
goodboy Nov 15, 2022
2a158ae
Rework `_FeedsBus` subscriptions mgmt using `set`
goodboy Nov 16, 2022
967e28b
Adjust built-in backend list to known working
goodboy Nov 17, 2022
c944db5
Revert "Fix `_main()` arg back to `sym: str`"
goodboy Nov 17, 2022
28fd795
Only require `-b <brokername>` for filtering
goodboy Nov 15, 2022
5d021ff
Bump up timeout on multi-feed test for CI
goodboy Nov 17, 2022
b7e1443
Use ETH on kraken to ensure enough quotes
goodboy Dec 10, 2022
f232d6d
Add `ci_env` detector fixture
goodboy Jan 9, 2023
76f920a
Always force lowercase on `binance` symbol keys
goodboy Dec 20, 2022
f6b7057
`binance`: always request an extra 1min OHLC bar
goodboy Jan 5, 2023
81585d9
Set global registry addr after first entry point spawns `pikerd`
goodboy Jan 9, 2023
008ae47
Reset `._registry_addr` to any passed in value from caller
goodboy Jan 9, 2023
0662210
Add a `open_test_pikerd()` acm fixture for easy booting of the servic…
goodboy Jan 9, 2023
593db0e
Only run `kraken` feed tests in CI, use `open_test_pikerd()`
goodboy Jan 9, 2023
55de9ab
Adjust cli mod imports of daemon sockaddr vars
goodboy Jan 9, 2023
963e5bd
Go back to `Feed.pause/resume()`, new flume APIs coming later
goodboy Jan 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,27 @@ on:

jobs:

# test that we can generate a software distribution and install it
# thus avoid missing file issues after packaging.
sdist-linux:
name: 'sdist'
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.10'

- name: Build sdist
run: python setup.py sdist --formats=zip

- name: Install sdist from .zips
run: python -m pip install dist/*.zip

testing:
name: 'install + test-suite'
runs-on: ubuntu-latest
Expand Down
62 changes: 31 additions & 31 deletions dockering/ib/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,39 +62,39 @@ services:
# - "127.0.0.1:4002:4002"
# - "127.0.0.1:5900:5900"

ib_gw_live:
image: waytrade/ib-gateway:1012.2i
restart: always
network_mode: 'host'
# ib_gw_live:
# image: waytrade/ib-gateway:1012.2i
# restart: always
# network_mode: 'host'

volumes:
- type: bind
source: ./jts_live.ini
target: /root/jts/jts.ini
# don't let ibc clobber this file for
# the main reason of not having a stupid
# timezone set..
read_only: true
# volumes:
# - type: bind
# source: ./jts_live.ini
# target: /root/jts/jts.ini
# # don't let ibc clobber this file for
# # the main reason of not having a stupid
# # timezone set..
# read_only: true

# force our own ibc config
- type: bind
source: ./ibc.ini
target: /root/ibc/config.ini
# # force our own ibc config
# - type: bind
# source: ./ibc.ini
# target: /root/ibc/config.ini

# force our noop script - socat isn't needed in host mode.
- type: bind
source: ./fork_ports_delayed.sh
target: /root/scripts/fork_ports_delayed.sh
# # force our noop script - socat isn't needed in host mode.
# - type: bind
# source: ./fork_ports_delayed.sh
# target: /root/scripts/fork_ports_delayed.sh

# force our noop script - socat isn't needed in host mode.
- type: bind
source: ./run_x11_vnc.sh
target: /root/scripts/run_x11_vnc.sh
read_only: true
# # force our noop script - socat isn't needed in host mode.
# - type: bind
# source: ./run_x11_vnc.sh
# target: /root/scripts/run_x11_vnc.sh
# read_only: true

# NOTE: to fill these out, define an `.env` file in the same dir as
# this compose file which looks something like:
environment:
TRADING_MODE: 'live'
VNC_SERVER_PASSWORD: 'doggy'
VNC_SERVER_PORT: '3004'
# # NOTE: to fill these out, define an `.env` file in the same dir as
# # this compose file which looks something like:
# environment:
# TRADING_MODE: 'live'
# VNC_SERVER_PASSWORD: 'doggy'
# VNC_SERVER_PORT: '3004'
35 changes: 27 additions & 8 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@

_root_dname = 'pikerd'

_registry_host: str = '127.0.0.1'
_registry_port: int = 6116
_registry_addr = (
_registry_host,
_registry_port,
_default_registry_host: str = '127.0.0.1'
_default_registry_port: int = 6116
_default_reg_addr: tuple[str, int] = (
_default_registry_host,
_default_registry_port,
)

# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry_addr: tuple[str, int] | None = None

_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
Expand Down Expand Up @@ -152,13 +157,20 @@ async def open_pikerd(
'''
global _services
global _registry_addr

if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr

# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(

# passed through to ``open_root_actor``
arbiter_addr=registry_addr or _registry_addr,
arbiter_addr=_registry_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
Expand Down Expand Up @@ -197,7 +209,7 @@ async def open_piker_runtime(
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = _registry_addr,
registry_addr: None | tuple[str, int] = None,

) -> tractor.Actor:
'''
Expand All @@ -206,13 +218,20 @@ async def open_piker_runtime(
'''
global _services
global _registry_addr

if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr

# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(

# passed through to ``open_root_actor``
arbiter_addr=registry_addr,
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
Expand Down
15 changes: 13 additions & 2 deletions piker/brokers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,21 @@

__brokers__ = [
'binance',
'questrade',
'robinhood',
'ib',
'kraken',

# broken but used to work
# 'questrade',
# 'robinhood',

# TODO: we should get on these stat!
# alpaca
# wstrade
# iex

# deribit
# kucoin
# bitso
]


Expand Down
52 changes: 27 additions & 25 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@
SymbolNotFound,
DataUnavailable,
)
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..log import (
get_logger,
get_console_log,
)
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..data._web_bs import (
open_autorecon_ws,
NoBsWs,
)

log = get_logger(__name__)

Expand Down Expand Up @@ -142,7 +147,9 @@ class OHLC(Struct):


# convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp(when):
def binance_timestamp(
when: datetime
) -> int:
return int((when.timestamp() * 1000) + (when.microsecond / 1000))


Expand Down Expand Up @@ -181,7 +188,7 @@ async def symbol_info(
params = {}

if sym is not None:
sym = sym.upper()
sym = sym.lower()
params = {'symbol': sym}

resp = await self._api(
Expand Down Expand Up @@ -238,7 +245,7 @@ async def bars(
) -> dict:

if end_dt is None:
end_dt = pendulum.now('UTC')
end_dt = pendulum.now('UTC').add(minutes=1)

if start_dt is None:
start_dt = end_dt.start_of(
Expand Down Expand Up @@ -396,8 +403,8 @@ async def open_history_client(

async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
end_dt: datetime | None = None,
start_dt: datetime | None = None,

) -> tuple[
np.ndarray,
Expand All @@ -412,25 +419,20 @@ async def get_ohlc(
start_dt=start_dt,
end_dt=end_dt,
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
times = array['time']
if (
end_dt is None
):
inow = round(time.time())
if (inow - times[-1]) > 60:
await tractor.breakpoint()

yield get_ohlc, {'erlangs': 3, 'rate': 3}
start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1])

return array, start_dt, end_dt

async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('binance') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
yield get_ohlc, {'erlangs': 3, 'rate': 3}


async def stream_quotes(
Expand Down Expand Up @@ -465,7 +467,7 @@ async def stream_quotes(
si = sym_infos[sym] = syminfo.to_dict()
filters = {}
for entry in syminfo.filters:
ftype = entry.pop('filterType')
ftype = entry['filterType']
filters[ftype] = entry

# XXX: after manually inspecting the response format we
Expand Down
25 changes: 13 additions & 12 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'

raise ValueError(
# log.error(
# raise ValueError(
log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
Expand Down Expand Up @@ -575,17 +575,18 @@ async def trades_dialogue(
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_to_ledger_entries:
trade_entries = api_to_ledger_entries[acctid]
trade_entries = api_to_ledger_entries.get(acctid)

# write ledger with all new trades **AFTER**
# we've updated the `pps.toml` from the
# original ledger state! (i.e. this is
# currently done on exit)
ledger.update(trade_entries)
if trade_entries:
# write ledger with all new trades **AFTER**
# we've updated the `pps.toml` from the
# original ledger state! (i.e. this is
# currently done on exit)
ledger.update(trade_entries)

trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)

# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
Expand Down Expand Up @@ -883,7 +884,7 @@ async def deliver_trade_events(
# execdict.pop('acctNumber')

fill_msg = BrokerdFill(
# should match the value returned from
# NOTE: should match the value returned from
# `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
Expand Down
8 changes: 7 additions & 1 deletion piker/brokers/ib/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,13 @@ async def open_symbol_search(
stock_results = []

async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
try:
results = await target
except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?")
return

stock_results.extend(results)

for i in range(10):
with trio.move_on_after(3) as cs:
Expand Down
Loading