Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft storage layer cli #446

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions piker/data/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

"""
from functools import partial
from pprint import pformat
from pprint import (
pformat,
pprint,
)

from anyio_marketstore import open_marketstore_client
import trio
Expand Down Expand Up @@ -113,15 +116,11 @@ async def main():

@cli.command()
@click.option(
'--tl',
is_flag=True,
help='Enable tractor logging')
@click.option(
'--host',
'--tsdb_host',
default='localhost'
)
@click.option(
'--port',
'--tsdb_port',
default=5993
)
@click.argument('symbols', nargs=-1)
Expand All @@ -137,7 +136,7 @@ def storesh(
Start an IPython shell ready to query the local marketstore db.

'''
from piker.data.marketstore import tsdb_history_update
from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime

async def main():
Expand All @@ -148,7 +147,63 @@ async def main():
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update(symbol)

async with open_tsdb_client(symbol) as storage:
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()

trio.run(main)


@cli.command()
@click.option(
'--host',
default='localhost'
)
@click.option(
'--port',
default=5993
)
@click.option(
'--delete',
'-d',
is_flag=True,
help='Delete history (1 Min) for symbol(s)',
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def storage(
config,
host,
port,
symbols: list[str],
delete: bool,

):
'''
Start an IPython shell ready to query the local marketstore db.

'''
from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime

async def main():
nonlocal symbols

async with open_piker_runtime(
'tsdb_storage',
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
async with open_tsdb_client(symbol) as storage:
if delete:
for fqsn in symbols:
syms = await storage.client.list_symbols()
breakpoint()
await storage.delete_ts(fqsn, 60)
await storage.delete_ts(fqsn, 1)

trio.run(main)

Expand Down
23 changes: 9 additions & 14 deletions piker/data/marketstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ async def delete_ts(

client = self.client
syms = await client.list_symbols()
print(syms)
if key not in syms:
raise KeyError(f'`{key}` table key not found in\n{syms}?')

Expand Down Expand Up @@ -615,10 +614,10 @@ async def open_storage_client(
yield Storage(client)


async def tsdb_history_update(
fqsn: Optional[str] = None,

) -> list[str]:
@acm
async def open_tsdb_client(
fqsn: str,
) -> Storage:

# TODO: real-time dedicated task for ensuring
# history consistency between the tsdb, shm and real-time feed..
Expand Down Expand Up @@ -647,7 +646,7 @@ async def tsdb_history_update(
# - https://github.com/pikers/piker/issues/98
#
profiler = Profiler(
disabled=False, # not pg_profile_enabled(),
disabled=True, # not pg_profile_enabled(),
delayed=False,
)

Expand Down Expand Up @@ -688,14 +687,10 @@ async def tsdb_history_update(

# profiler('Finished db arrays diffs')

syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')

# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
syms = await storage.client.list_symbols()
# log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
# profiler(f'listed symbols {syms}')
yield storage

# for array in [to_append, to_prepend]:
# if array is None:
Expand Down