Skip to content

Commit

Permalink
Port kraken backend to new data feed api
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Mar 31, 2021
1 parent 4e52a99 commit 08f697f
Showing 1 changed file with 50 additions and 83 deletions.
133 changes: 50 additions & 83 deletions piker/brokers/kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

"""
Kraken backend.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple
import json
import time

import trio_websocket
from trio_typing import TaskStatus
from trio_websocket._impl import (
ConnectionClosed,
DisconnectionTimeout,
Expand All @@ -41,15 +43,11 @@
from pydantic.dataclasses import dataclass
from pydantic import BaseModel


from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import (
_buffer,
# iterticks,
attach_shm_array,
get_shm_token,
subscribe_ohlc_for_increment,
)
from ..data import ShmArray

log = get_logger(__name__)

Expand Down Expand Up @@ -315,6 +313,7 @@ def normalize(
quote['brokerd_ts'] = time.time()
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
quote['last'] = quote['close']
quote['bar_wap'] = ohlc.vwap

# seriously eh? what's with this non-symmetry everywhere
# in subscription systems...
Expand Down Expand Up @@ -426,17 +425,37 @@ async def open_autorecon_ws(url):
await stack.aclose()


# @tractor.msg.pub
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa

count: int = 10, # NOTE: any more and we'll overrun the underlying buffer

task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)


async def stream_quotes(
# get_topics: Callable,
shm_token: Tuple[str, str, List[tuple]],
symbols: List[str] = ['XBTUSD', 'XMRUSD'],
# These are the symbols not expected by the ws api
# they are looked up inside this routine.
sub_type: str = 'ohlc',

send_chan: trio.abc.SendChannel,
symbols: List[str],
shm: ShmArray,
feed_is_live: trio.Event,
loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None,

# backend specific
sub_type: str = 'ohlc',

# startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,

) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
Expand All @@ -447,7 +466,8 @@ async def stream_quotes(

ws_pairs = {}
sym_infos = {}
async with get_client() as client:

async with open_cached_client('kraken') as client:

# keep client cached for real-time section
for sym in symbols:
Expand All @@ -458,40 +478,16 @@ async def stream_quotes(
sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname

# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])

symbol = symbols[0]

if not writer_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)

shm.push(bars)
shm_token = shm.token

times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)

# yield shm_token, not writer_exists
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
symbol: {
'is_shm_writer': not writer_exists,
'shm_token': shm_token,
'symbol_info': sym_infos[sym],
}
# for sym in symbols
'shm_write_opts': {'sum_tick_vml': False},
},
}
yield init_msgs

async with open_autorecon_ws('wss://ws.kraken.com/') as ws:

Expand Down Expand Up @@ -521,15 +517,16 @@ async def stream_quotes(
# pull a first quote and deliver
msg_gen = stream_messages(ws)

# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()

topic, quote = normalize(ohlc_last)

# packetize as {topic: quote}
yield {topic: quote}
first_quote = {topic: quote}
task_status.started((init_msgs, first_quote))

# tell incrementer task it can start
_buffer.shm_incrementing(shm_token['shm_name']).set()
# lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set()

# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
Expand All @@ -546,15 +543,18 @@ async def stream_quotes(
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume

# new interval
# new OHLC sample interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume

else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume

ohlc_last = ohlc
last = ohlc.close

if tick_volume:
ohlc.ticks.append({
'type': 'trade',
Expand All @@ -564,43 +564,10 @@ async def stream_quotes(

topic, quote = normalize(ohlc)

# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume

if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last

# write shm
shm.array[
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc

elif typ == 'l1':
quote = ohlc
topic = quote['symbol']

# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote}
await send_chan.send({topic: quote})

0 comments on commit 08f697f

Please sign in to comment.