diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 7e6894653..888c9f1cc 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -39,13 +39,15 @@ ) import hmac import time -import decimal import hashlib from pathlib import Path import trio from trio_typing import TaskStatus -import pendulum +from pendulum import ( + now, + from_timestamp, +) import asks from fuzzywuzzy import process as fuzzy import numpy as np @@ -78,10 +80,10 @@ from ..clearing._messages import ( BrokerdOrder, BrokerdOrderAck, - # BrokerdCancel, - #BrokerdStatus, - #BrokerdPosition, - #BrokerdFill, + BrokerdStatus, + BrokerdPosition, + BrokerdFill, + BrokerdCancel, # BrokerdError, ) @@ -104,6 +106,7 @@ def get_config() -> dict: _url = 'https://api.binance.com' +_sapi_url = 'https://api.binance.com' _fapi_url = 'https://testnet.binancefuture.com' @@ -243,18 +246,25 @@ def __init__(self) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location: str = _url - # testnet EP sesh + # futes testnet rest EPs self._fapi_sesh = asks.Session(connections=4) self._fapi_sesh.base_location = _fapi_url + # sync rest API + self._sapi_sesh = asks.Session(connections=4) + self._sapi_sesh.base_location = _sapi_url + conf: dict = get_config() self.api_key: str = conf.get('api_key', '') self.api_secret: str = conf.get('api_secret', '') + self.watchlist = conf.get('watchlist', []) + if self.api_key: api_key_header = {'X-MBX-APIKEY': self.api_key} self._sesh.headers.update(api_key_header) self._fapi_sesh.headers.update(api_key_header) + self._sapi_sesh.headers.update(api_key_header) def _get_signature(self, data: OrderedDict) -> str: @@ -315,6 +325,25 @@ async def _fapi( return resproc(resp, log) + async def _sapi( + self, + method: str, + params: Union[dict, OrderedDict], + signed: bool = False, + action: str = 'get' + ) -> dict[str, Any]: + + if signed: + params['signature'] = self._get_signature(params) + + resp = await getattr(self._sapi_sesh, action)( + path=f'/sapi/v1/{method}', + params=params, + timeout=float('inf') + ) + + return resproc(resp, log) + async def exch_info( self, sym: str | None = None, @@ -397,7 +426,7 @@ async def bars( ) -> dict: if end_dt is None: - end_dt = pendulum.now('UTC').add(minutes=1) + end_dt = now('UTC').add(minutes=1) if start_dt is None: start_dt = end_dt.start_of( @@ -446,6 +475,58 @@ async def bars( array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array + async def get_positions( + self, + recv_window: int = 60000 + ) -> tuple: + positions = {} + volumes = {} + + for sym in self.watchlist: + log.info(f'doing {sym}...') + params = OrderedDict([ + ('symbol', sym), + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + resp = await self._api( + 'allOrders', + params=params, + signed=True + ) + log.info(f'done. len {len(resp)}') + await trio.sleep(3) + + return positions, volumes + + async def get_deposits( + self, + recv_window: int = 60000 + ) -> list: + + params = OrderedDict([ + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + return await self._sapi( + 'capital/deposit/hisrec', + params=params, + signed=True) + + async def get_withdrawls( + self, + recv_window: int = 60000 + ) -> list: + + params = OrderedDict([ + ('recvWindow', recv_window), + ('timestamp', binance_timestamp(now())) + ]) + return await self._sapi( + 'capital/withdraw/history', + params=params, + signed=True) + async def submit_limit( self, symbol: str, @@ -463,18 +544,8 @@ async def submit_limit( await self.cache_symbols() - asset_precision = self._pairs[symbol]['baseAssetPrecision'] - quote_precision = self._pairs[symbol]['quoteAssetPrecision'] - - quantity = Decimal(quantity).quantize( - Decimal(1 ** -asset_precision), - rounding=decimal.ROUND_HALF_EVEN - ) - - price = Decimal(price).quantize( - Decimal(1 ** -quote_precision), - rounding=decimal.ROUND_HALF_EVEN - ) + # asset_precision = self._pairs[symbol]['baseAssetPrecision'] + # quote_precision = self._pairs[symbol]['quoteAssetPrecision'] params = OrderedDict([ ('symbol', symbol), @@ -485,21 +556,21 @@ async def submit_limit( ('price', price), ('recvWindow', recv_window), ('newOrderRespType', 'ACK'), - ('timestamp', binance_timestamp(pendulum.now())) + ('timestamp', binance_timestamp(now())) ]) if oid: params['newClientOrderId'] = oid resp = await self._api( - 'order/test', # TODO: switch to real `order` endpoint + 'order', params=params, signed=True, action='post' ) - - assert resp['orderId'] == oid - return oid + log.info(resp) + # return resp['orderId'] + return resp['orderId'] async def submit_cancel( self, @@ -513,10 +584,10 @@ async def submit_cancel( ('symbol', symbol), ('orderId', oid), ('recvWindow', recv_window), - ('timestamp', binance_timestamp(pendulum.now())) + ('timestamp', binance_timestamp(now())) ]) - await self._api( + return await self._api( 'order', params=params, signed=True, @@ -524,11 +595,11 @@ async def submit_cancel( ) async def get_listen_key(self) -> str: - return await self._api( + return (await self._api( 'userDataStream', params={}, action='post' - )['listenKey'] + ))['listenKey'] async def keep_alive_key(self, listen_key: str) -> None: await self._fapi( @@ -559,7 +630,7 @@ async def periodic_keep_alive( key = await self.get_listen_key() async with trio.open_nursery() as n: - n.start_soon(periodic_keep_alive, key) + n.start_soon(periodic_keep_alive, self, key) yield key n.cancel_scope.cancel() @@ -730,8 +801,8 @@ async def get_ohlc( if (inow - times[-1]) > 60: await tractor.breakpoint() - start_dt = pendulum.from_timestamp(times[0]) - end_dt = pendulum.from_timestamp(times[-1]) + start_dt = from_timestamp(times[0]) + end_dt = from_timestamp(times[-1]) return array, start_dt, end_dt @@ -870,15 +941,15 @@ async def subscribe(ws: NoBsWs): # hz = 1/period if period else float('inf') # if hz > 60: # log.info(f'Binance quotez : {hz}') - - topic = msg['symbol'].lower() - await send_chan.send({topic: msg}) + + if typ == 'l1': + topic = msg['symbol'].lower() + await send_chan.send({topic: msg}) # last = time.time() async def handle_order_requests( - ems_order_stream: tractor.MsgStream, - symbol: str + ems_order_stream: tractor.MsgStream ) -> None: async with open_cached_client('binance') as client: async for request_msg in ems_order_stream: @@ -935,43 +1006,39 @@ async def trades_dialogue( # ledger: TransactionLedger # TODO: load pps and accounts using accounting apis! - # positions: dict = {} - # accounts: set[str] = set() - # await ctx.started((positions, {})) + positions: list[BrokerdPosition] = [] + accounts: list[str] = ['binance.default'] + await ctx.started((positions, accounts)) async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, open_cached_client('binance') as client, - # client.manage_listen_key() as listen_key, + client.manage_listen_key() as listen_key, ): n.start_soon(handle_order_requests, ems_stream) - await trio.sleep_forever() - + # await trio.sleep_forever() + async with open_autorecon_ws( f'wss://stream.binance.com:9443/ws/{listen_key}', ) as ws: event = await ws.recv_msg() + # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update if event.get('e') == 'executionReport': - """ - https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update - """ - - oid = event.get('c') - side = event.get('S').lower() - status = event.get('X') - order_qty = float(event.get('q')) - filled_qty = float(event.get('z')) - cumm_transacted_qty = float(event.get('Z')) - price_avg = cum_transacted_qty / filled_qty - - broker_time = float(event.get('T')) - - commission_amount = float(event.get('n')) - commission_asset = event.get('N') - if status == 'TRADE': + oid: str = event.get('c') + side: str = event.get('S').lower() + status: str = event.get('X') + order_qty: float = float(event.get('q')) + filled_qty: float = float(event.get('z')) + cum_transacted_qty: float = float(event.get('Z')) + price_avg: float = cum_transacted_qty / filled_qty + broker_time: float = float(event.get('T')) + commission_amount: float = float(event.get('n')) + commission_asset: float = event.get('N') + + if status == 'TRADE': if order_qty == filled_qty: msg = BrokerdFill( reqid=oid, @@ -990,7 +1057,7 @@ async def trades_dialogue( ) else: - if status == 'NEW': + if status == 'NEW': status = 'submitted' elif status == 'CANCELED':