Skip to content

Commit

Permalink
Merge branch '0.38/connector-altmarkets' into 0.38-altmarkets
Browse files Browse the repository at this point in the history
  • Loading branch information
TheHolyRoger committed Apr 23, 2021
2 parents 0e20c23 + 355ffe7 commit bdf1976
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,39 @@ def __init__(self, altmarkets_auth: AltmarketsAuth, trading_pairs: Optional[List
self._current_listen_key = None
self._listen_for_user_stream_task = None
self._last_recv_time: float = 0
self._ws: AltmarketsWebsocket = None
super().__init__()

@property
def last_recv_time(self) -> float:
return self._last_recv_time

@property
def is_connected(self):
return self._ws.is_connected if self._ws is not None else False

async def _listen_to_orders_trades_balances(self) -> AsyncIterable[Any]:
"""
Subscribe to active orders via web socket
"""

try:
ws = AltmarketsWebsocket(self._altmarkets_auth)
self._ws = AltmarketsWebsocket(self._altmarkets_auth)

await ws.connect()
await self._ws.connect()

await ws.subscribe(Constants.WS_SUB["USER_ORDERS_TRADES"])
await self._ws.subscribe(Constants.WS_SUB["USER_ORDERS_TRADES"])

async for msg in ws.on_message():
async for msg in self._ws.on_message():
# print(f"user msg: {msg}")
self._last_recv_time = time.time()
if msg is not None:
yield msg

except Exception as e:
raise e
finally:
await ws.disconnect()
await self._ws.disconnect()
await asyncio.sleep(5)

async def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue) -> AsyncIterable[Any]:
Expand Down
4 changes: 3 additions & 1 deletion hummingbot/connector/exchange/altmarkets/altmarkets_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hmac
from datetime import datetime, timezone, timedelta
from typing import Dict, Any
from hummingbot.connector.exchange.altmarkets.altmarkets_constants import Constants


class AltmarketsAuth():
Expand Down Expand Up @@ -40,5 +41,6 @@ def get_headers(self) -> (Dict[str, Any]):
"X-Auth-Apikey": self.api_key,
"X-Auth-Nonce": nonce,
"X-Auth-Signature": signature,
"Content-Type": "application/json"
"Content-Type": "application/json",
"User-Agent": Constants.USER_AGENT
}
15 changes: 9 additions & 6 deletions hummingbot/connector/exchange/altmarkets/altmarkets_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
class Constants:
EXCHANGE_NAME = "altmarkets"
REST_URL = "https://v2.altmarkets.io/api/v2/peatio"
# WS_PRIVATE_URL = "wss://stream.crypto.com/v2/user"
WS_PRIVATE_URL = "wss://v2.altmarkets.io/api/v2/ranger/private"
# WS_PUBLIC_URL = "wss://stream.crypto.com/v2/market"
WS_PUBLIC_URL = "wss://v2.altmarkets.io/api/v2/ranger/public"

HBOT_BROKER_ID = "HBOT"

USER_AGENT = "HBOT_AMv2"

ENDPOINT = {
# Public Endpoints
"TIMESTAMP": "public/timestamp",
Expand All @@ -26,14 +26,15 @@ class Constants:
WS_SUB = {
"TRADES": "{trading_pair}.trades",
"ORDERS": "{trading_pair}.ob-inc",
"USER_ORDERS_TRADES": ['order', 'trade'],
"USER_ORDERS_TRADES": ['balance', 'order', 'trade'],

}

WS_METHODS = {
"ORDERS_SNAPSHOT": ".ob-snap",
"ORDERS_UPDATE": ".ob-inc",
"TRADES_UPDATE": ".trades",
"USER_BALANCES": "balance",
"USER_ORDERS": "order",
"USER_TRADES": "trade",
}
Expand All @@ -54,11 +55,13 @@ class Constants:

# Intervals
# Only used when nothing is received from WS
SHORT_POLL_INTERVAL = 5.0
# One minute should be fine since we request balance updates on order updates
LONG_POLL_INTERVAL = 60.0
SHORT_POLL_INTERVAL = 10.0
# Two minutes should be fine since we get balances via WS
LONG_POLL_INTERVAL = 120.0
# Two minutes should be fine for order status since we get these via WS
UPDATE_ORDER_STATUS_INTERVAL = 120.0
# We don't get many messages here if we're not updating orders so set this pretty high
USER_TRACKER_MAX_AGE = 300.0
# 10 minute interval to update trading rules, these would likely never change whilst running.
INTERVAL_TRADING_RULES = 600

Expand Down
25 changes: 17 additions & 8 deletions hummingbot/connector/exchange/altmarkets/altmarkets_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def start(self, clock: Clock, timestamp: float):
"""
This function is called automatically by the clock.
"""
if self._poll_notifier.is_set():
self._poll_notifier.clear()
super().start(clock, timestamp)

def stop(self, clock: Clock):
Expand Down Expand Up @@ -324,7 +326,7 @@ async def _api_request(self,
qs_params: dict = params if method.upper() == "GET" else None
req_params = ujson.dumps(params) if method.upper() == "POST" and params is not None else None
# Generate auth headers if needed.
headers: dict = {"Content-Type": "application/json"}
headers: dict = {"Content-Type": "application/json", "User-Agent": Constants.USER_AGENT}
if is_auth_required:
headers: dict = self._altmarkets_auth.get_headers()
# Build request coro
Expand Down Expand Up @@ -439,6 +441,7 @@ async def _create_order(self,
"side": trade_type.name.lower(),
"ord_type": order_type_str,
# "price": f"{price:f}",
"client_id": order_id,
"volume": f"{amount:f}",
}
if order_type is not OrderType.MARKET:
Expand Down Expand Up @@ -540,8 +543,9 @@ async def _execute_cancel(self, trading_pair: str, order_id: str) -> str:
raise
except AltmarketsAPIError as e:
errors_found = e.error_payload.get('errors', e.error_payload)
order_state = errors_found.get("state", None)
if order_state is None:
if isinstance(errors_found, dict):
order_state = errors_found.get("state", None)
if order_state is None or 'market.order.invaild_id_or_uuid' in errors_found:
self._order_not_found_records[order_id] = self._order_not_found_records.get(order_id, 0) + 1
if order_state in Constants.ORDER_STATES['CANCEL_WAIT'] or \
self._order_not_found_records.get(order_id, 0) >= self.ORDER_NOT_EXIST_CANCEL_COUNT:
Expand Down Expand Up @@ -683,8 +687,6 @@ def _process_order_message(self, order_msg: Dict[str, Any]):
order_msg["trade_fee"] = self.estimate_fee_pct(tracked_order.order_type is OrderType.LIMIT_MAKER)
try:
updated = tracked_order.update_with_order_update(order_msg)
# Call Update balances on every message to catch order create, fill and cancel.
safe_ensure_future(self._update_balances())
except Exception as e:
self.logger().error(f"Error in order update for {tracked_order.exchange_order_id}. Message: {order_msg}\n{e}")
traceback.print_exc()
Expand Down Expand Up @@ -728,14 +730,17 @@ async def _process_trade_message(self, trade_msg: Dict[str, Any]):
# Estimate fee
trade_msg["trade_fee"] = self.estimate_fee_pct(tracked_order.order_type is OrderType.LIMIT_MAKER)
updated = tracked_order.update_with_trade_update(trade_msg)
# Call Update balances on every message to catch order create, fill and cancel.
safe_ensure_future(self._update_balances())

if not updated:
return

await self._trigger_order_fill(tracked_order, trade_msg)

def _process_balance_message(self, balance_message: Dict[str, Any]):
asset_name = balance_message["currency"].upper()
self._account_available_balances[asset_name] = Decimal(str(balance_message["balance"]))
self._account_balances[asset_name] = Decimal(str(balance_message["locked"])) + Decimal(str(balance_message["balance"]))

async def _trigger_order_fill(self,
tracked_order: AltmarketsInFlightOrder,
update_msg: Dict[str, Any]):
Expand Down Expand Up @@ -812,7 +817,8 @@ def tick(self, timestamp: float):
"""
now = time.time()
poll_interval = (Constants.SHORT_POLL_INTERVAL
if now - self._user_stream_tracker.last_recv_time > 120.0
if not self._user_stream_tracker.is_connected
or now - self._user_stream_tracker.last_recv_time > Constants.USER_TRACKER_MAX_AGE
else Constants.LONG_POLL_INTERVAL)
last_tick = int(self._last_timestamp / poll_interval)
current_tick = int(timestamp / poll_interval)
Expand Down Expand Up @@ -857,6 +863,7 @@ async def _user_stream_event_listener(self):
async for event_message in self._iter_user_event_queue():
try:
event_methods = [
Constants.WS_METHODS["USER_BALANCES"],
Constants.WS_METHODS["USER_ORDERS"],
Constants.WS_METHODS["USER_TRADES"],
]
Expand All @@ -870,6 +877,8 @@ async def _user_stream_event_listener(self):
await self._process_trade_message(params)
elif method == Constants.WS_METHODS["USER_ORDERS"]:
self._process_order_message(params)
elif method == Constants.WS_METHODS["USER_BALANCES"]:
self._process_balance_message(params)
except asyncio.CancelledError:
raise
except Exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def update_with_order_update(self, order_update: Dict[str, Any]) -> bool:
if self.executed_amount_base <= s_decimal_0:
# No trades executed yet.
return False
trade_id = order_update["updated_at"]
trade_id = f"{order_update['id']}-{order_update['updated_at']}"
if trade_id in self.trade_id_set:
# trade already recorded
return False
Expand Down Expand Up @@ -149,11 +149,11 @@ def update_with_trade_update(self, trade_update: Dict[str, Any]) -> bool:
if self.executed_amount_base <= s_decimal_0:
# No trades executed yet.
return False
trade_id = trade_update["id"]
trade_id = f"{trade_update['order_id']}-{trade_update['created_at']}"
if trade_id in self.trade_id_set:
# trade already recorded
return False
trade_update["exchange_trade_id"] = trade_id
trade_update["exchange_trade_id"] = trade_update["id"]
self.trade_id_set.add(trade_id)
self.fee_paid += trade_update.get("trade_fee") * self.executed_amount_base
if not self.fee_asset:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def data_source(self) -> UserStreamTrackerDataSource:
)
return self._data_source

@property
def is_connected(self) -> float:
return self._data_source.is_connected if self._data_source is not None else False

@property
def exchange_name(self) -> str:
"""
Expand Down
20 changes: 13 additions & 7 deletions hummingbot/connector/exchange/altmarkets/altmarkets_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import random
import re
import ujson
from dateutil.parser import parse as dateparse
from typing import (
Any,
Expand Down Expand Up @@ -78,7 +79,7 @@ def get_new_client_order_id(is_buy: bool, trading_pair: str) -> str:
quote = symbols[1].upper()
base_str = f"{base[0:4]}{base[-1]}"
quote_str = f"{quote[0:2]}{quote[-1]}"
return f"{Constants.HBOT_BROKER_ID}-{side}-{base_str}{quote_str}-{get_tracking_nonce()}"
return f"{Constants.HBOT_BROKER_ID}-{side}{base_str}{quote_str}{get_tracking_nonce()}"


def retry_sleep_time(try_count: int) -> float:
Expand All @@ -98,14 +99,19 @@ async def aiohttp_response_with_errors(request_coroutine):
request_errors = True
try:
parsed_response = await response.text('utf-8')
if len(parsed_response) < 1:
parsed_response = None
elif len(parsed_response) > 100:
parsed_response = f"{parsed_response[:100]} ... (truncated)"
try:
parsed_response = ujson.loads(parsed_response)
except Exception:
if len(parsed_response) < 1:
parsed_response = None
elif len(parsed_response) > 100:
parsed_response = f"{parsed_response[:100]} ... (truncated)"
except Exception:
pass
TempFailure = (parsed_response is None or
(response.status not in [200, 201] and "errors" not in parsed_response))
(response.status not in [200, 201] and
"errors" not in parsed_response and
"error" not in parsed_response))
if TempFailure:
parsed_response = response.reason if parsed_response is None else parsed_response
request_errors = True
Expand All @@ -120,7 +126,7 @@ async def api_call_with_retries(method,
shared_client=None,
try_count: int = 0) -> Dict[str, Any]:
url = f"{Constants.REST_URL}/{endpoint}"
headers = {"Content-Type": "application/json"}
headers = {"Content-Type": "application/json", "User-Agent": Constants.USER_AGENT}
http_client = shared_client if shared_client is not None else aiohttp.ClientSession()
# Build request coro
response_coro = http_client.request(method=method.upper(), url=url, headers=headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ def __init__(self, auth: Optional[AltmarketsAuth] = None):
self._client: Optional[websockets.WebSocketClientProtocol] = None
self._is_subscribed = False

@property
def is_connected(self):
return self._client.open if self._client is not None else False

@property
def is_subscribed(self):
return self._is_subscribed

# connect to exchange
async def connect(self):
extra_headers = self._auth.get_headers() if self._isPrivate else None
extra_headers = self._auth.get_headers() if self._isPrivate else {"User-Agent": Constants.USER_AGENT}
self._client = await websockets.connect(self._WS_URL, extra_headers=extra_headers)

return self._client
Expand Down
13 changes: 9 additions & 4 deletions test/connector/exchange/altmarkets/test_altmarkets_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from hummingbot.connector.exchange.altmarkets.altmarkets_websocket import AltmarketsWebsocket
from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL
from hummingbot.connector.exchange.altmarkets.altmarkets_constants import Constants
from hummingbot.connector.exchange.altmarkets.altmarkets_utils import aiohttp_response_with_errors

sys.path.insert(0, realpath(join(__file__, "../../../../../")))
logging.basicConfig(level=METRICS_LOG_LEVEL)
Expand All @@ -29,9 +30,9 @@ async def rest_auth(self) -> Dict[Any, Any]:
endpoint = Constants.ENDPOINT['USER_BALANCES']
headers = self.auth.get_headers()
http_client = aiohttp.ClientSession()
response = await http_client.get(f"{Constants.REST_URL}/{endpoint}", headers=headers)
http_status, response, request_errors = await aiohttp_response_with_errors(http_client.request(method='GET', url=f"{Constants.REST_URL}/{endpoint}", headers=headers))
await http_client.close()
return await response.json()
return response, request_errors

async def ws_auth(self) -> Dict[Any, Any]:
ws = AltmarketsWebsocket(self.auth)
Expand All @@ -44,9 +45,13 @@ async def ws_auth(self) -> Dict[Any, Any]:
return False

def test_rest_auth(self):
result = self.ev_loop.run_until_complete(self.rest_auth())
result, errors = self.ev_loop.run_until_complete(self.rest_auth())
if errors:
reason = result.get('errors', result.get('error', result)) if isinstance(result, dict) else result
print(f"\nUnable to connect: {reason}")
assert errors is False
if len(result) == 0 or "currency" not in result[0].keys():
print(f"Unexpected response for API call: {result}")
print(f"\nUnexpected response for API call: {result}")
assert "currency" in result[0].keys()

def test_ws_auth(self):
Expand Down

0 comments on commit bdf1976

Please sign in to comment.