Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…s testing)
  • Loading branch information
Mouthwatering777 committed Sep 14, 2020
1 parent ecf25fb commit e6eb6cb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,26 @@ def __init__(self,
print(update_msg)
logging.warn(update_msg)

def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label=None, stream_buffer_name=False):
def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label=None, stream_buffer_name=False,
symbol=False):
"""
Create a list entry for new sockets
:param stream_id: provide a stream_id (only needed for userData Streams (acquiring a listenKey)
:type stream_id: uuid
:param channels: provide the channels to create the URI
:type channels: str, tuple, list, set
:param stream_label: provide a stream_label for the stream
:type stream_label: str
:param markets: provide the markets to create the URI
:type markets: str, tuple, list, set
:param stream_label: provide a stream_label for the stream
:type stream_label: str
:param stream_buffer_name: If `False` the data is going to get written to the default stream_buffer,
set to `True` to read the data via `pop_stream_data_from_stream_buffer(stream_id)` or
provide a string to create and use a shared stream_buffer and read it via
`pop_stream_data_from_stream_buffer('string')`.
:type stream_buffer_name: bool or str
:param symbol: provide the symbol for isolated_margin user_data streams
:type symbol: str
"""
self.stream_threading_lock[stream_id] = {'full_lock': threading.Lock(),
'receives_statistic_last_second_lock': threading.Lock()}
Expand All @@ -250,6 +258,7 @@ def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label=
'markets': copy.deepcopy(markets),
'stream_label': copy.deepcopy(stream_label),
'stream_buffer_name': copy.deepcopy(stream_buffer_name),
'symbol': copy.deepcopy(symbol),
'subscriptions': 0,
'payload': [],
'api_key': copy.deepcopy(self.api_key),
Expand All @@ -275,11 +284,11 @@ def _add_socket_to_socket_list(self, stream_id, channels, markets, stream_label=
'processed_receives_statistic': {},
'transfer_rate_per_second': {'bytes': {}, 'speed': 0}}
logging.info("BinanceWebSocketApiManager->_add_socket_to_socket_list(" +
str(stream_id) + ", " + str(channels) + ", " + str(markets) + ", " + str(stream_label)
+ str(stream_buffer_name) + ")")
str(stream_id) + ", " + str(channels) + ", " + str(markets) + ", " + str(stream_label) + ", "
+ str(stream_buffer_name) + ", " + str(symbol) + ")")

def _create_stream_thread(self, loop, stream_id, channels, markets, stream_label=None, stream_buffer_name=False,
restart=False):
symbol=False, restart=False):
"""
Co function of self.create_stream to create a thread for the socket and to manage the coroutine
Expand All @@ -298,14 +307,16 @@ def _create_stream_thread(self, loop, stream_id, channels, markets, stream_label
provide a string to create and use a shared stream_buffer and read it via
`pop_stream_data_from_stream_buffer('string')`.
:type stream_buffer_name: bool or str
:param symbol: provide the symbol for isolated_margin user_data streams
:type symbol: str
:param restart: set to `True`, if its a restart!
:type restart: bool
:return:
"""
if self.is_stop_request(stream_id):
return False
if restart is False:
self._add_socket_to_socket_list(stream_id, channels, markets, stream_label, stream_buffer_name)
self._add_socket_to_socket_list(stream_id, channels, markets, stream_label, stream_buffer_name, symbol)
if stream_buffer_name is not False:
self.stream_buffer_locks[stream_buffer_name] = threading.Lock()
self.stream_buffers[stream_buffer_name] = []
Expand Down Expand Up @@ -751,7 +762,7 @@ def create_payload(self, stream_id, method, channels=False, markets=False):
str(markets) + ") finished ...")
return payload

def create_stream(self, channels, markets, stream_label=None, stream_buffer_name=False):
def create_stream(self, channels, markets, stream_label=None, stream_buffer_name=False, symbol=False):
"""
Create a websocket stream
Expand Down Expand Up @@ -809,6 +820,8 @@ def create_stream(self, channels, markets, stream_label=None, stream_buffer_name
provide a string to create and use a shared stream_buffer and read it via
`pop_stream_data_from_stream_buffer('string')`.
:type stream_buffer_name: bool or str
:param symbol: provide the symbol for isolated_margin user_data streams
:type symbol: str
:return: stream_id or 'False'
"""
# create a stream
Expand Down Expand Up @@ -836,11 +849,12 @@ def create_stream(self, channels, markets, stream_label=None, stream_buffer_name
elif self.is_exchange_type('cex'):
markets_new.append(str(market).lower())
logging.info("BinanceWebSocketApiManager->create_stream(" + str(channels) + ", " + str(markets_new) + ", "
+ str(stream_label) + str(stream_buffer_name) + ") with stream_id=" + str(stream_id))
+ str(stream_label) + ", " + str(stream_buffer_name) + ", " + str(symbol) + ") with stream_id="
+ str(stream_id))
loop = asyncio.new_event_loop()
thread = threading.Thread(target=self._create_stream_thread, args=(loop, stream_id, channels,
markets_new, stream_label,
stream_buffer_name))
stream_buffer_name, symbol))
thread.start()
return stream_id

Expand Down Expand Up @@ -1299,7 +1313,7 @@ def get_number_of_free_subscription_slots(self, stream_id):
free_slots = self.max_subscriptions_per_stream - self.stream_list[stream_id]['subscriptions']
return free_slots

def get_listen_key_from_restclient(self, stream_id, api_key, api_secret):
def get_listen_key_from_restclient(self, stream_id, api_key, api_secret, symbol=False):
"""
Get a new or cached (<30m) listen_key
Expand All @@ -1309,6 +1323,8 @@ def get_listen_key_from_restclient(self, stream_id, api_key, api_secret):
:type api_key: str
:param api_secret: provide a valid Binance API secret
:type api_secret: str
:param symbol: provide the symbol for isolated_margin user_data streams
:type symbol: str
:return: str or False
"""
if (self.stream_list[stream_id]['start_time'] + self.stream_list[stream_id]['listen_key_cache_time']) > \
Expand All @@ -1321,7 +1337,8 @@ def get_listen_key_from_restclient(self, stream_id, api_key, api_secret):
# no cached listen_key or listen_key is older than 30 min
# acquire a new listen_key:
binance_websocket_api_restclient = BinanceWebSocketApiRestclient(self.exchange, api_key, api_secret,
self.get_version(), self.binance_api_status)
self.get_version(), self.binance_api_status,
symbol)
response = binance_websocket_api_restclient.get_listen_key()
del binance_websocket_api_restclient
if response:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@

class BinanceWebSocketApiRestclient(object):
def __init__(self, exchange, binance_api_key, binance_api_secret, unicorn_binance_websocket_api_version,
binance_api_status):
binance_api_status, symbol):
self.exchange = exchange
self.api_key = copy.deepcopy(binance_api_key)
self.api_secret = copy.deepcopy(binance_api_secret)
self.symbol = symbol
self.unicorn_binance_websocket_api_version = unicorn_binance_websocket_api_version
if self.exchange == "binance.com":
self.restful_base_uri = "https://api.binance.com/"
Expand Down Expand Up @@ -174,9 +175,12 @@ def get_listen_key(self):
:return: listen_key
:rtype: str or False
"""
logging.info("BinanceWebSocketApiRestclient->get_listen_key()")
logging.info("BinanceWebSocketApiRestclient->get_listen_key() symbol=" + str(self.symbol))
method = "post"
response = self._request(method, self.path_userdata)
if self.exchange == "binance.com-isolated_margin":
response = self._request(method, self.path_userdata, False, {'symbol': str(self.symbol)})
else:
response = self._request(method, self.path_userdata)
try:
self.listen_key = response['listenKey']
return response
Expand Down

0 comments on commit e6eb6cb

Please sign in to comment.