Skip to content

Commit

Permalink
Move maybe_open_feed() above for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Nov 12, 2022
1 parent 592523a commit df6df2e
Showing 1 changed file with 51 additions and 51 deletions.
102 changes: 51 additions & 51 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,57 @@ async def search(text: str) -> dict[str, Any]:
yield


@acm
async def maybe_open_feed(

fqsns: list[str],
loglevel: Optional[str] = None,

**kwargs,

) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
fqsn = fqsns[0]

async with maybe_open_context(
acm_func=open_feed,
kwargs={
'fqsns': fqsns,
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),

# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,

) as (cache_hit, feed):

if cache_hit:
log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use

async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
flume.stream = bstream

yield feed
else:
yield feed


@acm
async def open_feed(

Expand Down Expand Up @@ -1640,54 +1691,3 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals) == len(feed.streams)

yield feed


@acm
async def maybe_open_feed(

fqsns: list[str],
loglevel: Optional[str] = None,

**kwargs,

) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
fqsn = fqsns[0]

async with maybe_open_context(
acm_func=open_feed,
kwargs={
'fqsns': fqsns,
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),

# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,

) as (cache_hit, feed):

if cache_hit:
log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use

async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
flume.stream = bstream

yield feed
else:
yield feed

0 comments on commit df6df2e

Please sign in to comment.