Skip to content

Commit

Permalink
Fix pragma publisher (#80)
Browse files Browse the repository at this point in the history
* Hop for fetchers

* fix: remove unecessary function

* Refactors publishers

* format

* feat: more fetchers

* fix: typo

Co-authored-by: 0xevolve <[email protected]>

---------

Co-authored-by: 0xevolve <[email protected]>
  • Loading branch information
JordyRo1 and EvolveArt authored Feb 27, 2024
1 parent 35ebdec commit 54c5249
Show file tree
Hide file tree
Showing 8 changed files with 507 additions and 422 deletions.
158 changes: 80 additions & 78 deletions pragma/publisher/fetchers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,85 +28,37 @@ async def _fetch_pair(
self, asset: PragmaSpotAsset, session: ClientSession
) -> Union[SpotEntry, PublisherFetchError]:
pair = asset["pair"]
url = f"{self.BASE_URL}?symbol={pair[0]}{pair[1]}"

# For now still leaving this line,
if pair == ("STRK", "USD"):
pair = ("STRK", "USDT")
if pair == ("ETH", "STRK"):
url = f"{self.BASE_URL}?symbol=STRKUSDT"
async with session.get(url) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
result = await resp.json()
if "code" in result:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
eth_url = f"{self.BASE_URL}?symbol=ETHUSDT"
eth_resp = await session.get(eth_url)
eth_result = await eth_resp.json()
return self._construct(
asset,
result,
((float(eth_result["bidPrice"]) + float(eth_result["askPrice"])))
/ 2,
url = self.format_url(pair[0], pair[1])
async with session.get(url) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
else:
async with session.get(url) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
result = await resp.json()
if "code" in result:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
eth_url = f"{self.BASE_URL}?symbol=ETHUSDT"
eth_resp = await session.get(eth_url)
eth_result = await eth_resp.json()
return self._construct(asset, result)
result = await resp.json()
if "code" in result:
return await self.operate_usdt_hop(asset, session)
return self._construct(asset, result)

def _fetch_pair_sync(
self, asset: PragmaSpotAsset
) -> Union[SpotEntry, PublisherFetchError]:
pair = asset["pair"]
if pair == ("STRK", "USD"):
pair = ("STRK", "USDT")
if pair == ("ETH", "STRK"):
url = f"{self.BASE_URL}?symbol=STRKUSDT"
resp = requests.get(url)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
result = resp.json()
if "code" in result:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
eth_url = f"{self.BASE_URL}?symbol=ETHUSDT"
eth_resp = requests.get(eth_url)
eth_result = eth_resp.json()
return self._construct(
asset,
result,
(float(eth_result["bidPrice"]) + float(eth_result["askPrice"])) / 2,
url = self.format_url(pair[0], pair[1])
resp = requests.get(url)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
else:
url = f"{self.BASE_URL}?symbol={pair[0]}{pair[1]}"
resp = requests.get(url)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
result = resp.json()
if "code" in result:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance"
)
return self._construct(asset, result)
result = resp.json()
if "code" in result:
return self.operate_usdt_hop_sync(asset)
return self._construct(asset, result)

async def fetch(
self, session: ClientSession
Expand Down Expand Up @@ -134,18 +86,68 @@ def format_url(self, quote_asset, base_asset):
url = f"{self.BASE_URL}?symbol={quote_asset}{base_asset}"
return url

def _construct(self, asset, result, eth_price=None) -> SpotEntry:
async def operate_usdt_hop(self, asset, session) -> SpotEntry:
pair = asset["pair"]
url_pair1 = self.format_url(asset["pair"][0], "USDT")
async with session.get(url_pair1) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[0]}"
)
pair1_usdt = await resp.json()
if "code" in pair1_usdt:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[0]}"
)
url_pair2 = self.format_url(asset["pair"][1], "USDT")
async with session.get(url_pair2) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[1]}"
)
pair2_usdt = await resp.json()
if "code" in pair2_usdt:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[1]}"
)
return self._construct(asset, pair2_usdt, pair1_usdt)

def operate_usdt_hop_sync(self, asset) -> float:
pair = asset["pair"]
url_pair1 = self.format_url(asset["pair"][0], "USDT")
resp = requests.get(url_pair1)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[0]}"
)
pair1_usdt = resp.json()
if "code" in pair1_usdt:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[0]}"
)
url2 = self.format_url(asset["pair"][1], "USDT")
resp2 = requests.get(url2)
if resp2.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[1]}"
)
pair2_usdt = resp2.json()
if "code" in pair2_usdt:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Binance - hop failed for {pair[1]}"
)
return self._construct(asset, pair2_usdt, pair1_usdt)

if pair == ("ETH", "STRK"):
bid = float(result["bidPrice"])
ask = float(result["askPrice"])
strk_price = (bid + ask) / 2
price = eth_price / strk_price
else:
bid = float(result["bidPrice"])
ask = float(result["askPrice"])
price = (bid + ask) / 2
def _construct(self, asset, result, hop_result=None) -> SpotEntry:
pair = asset["pair"]
bid = float(result["bidPrice"])
ask = float(result["askPrice"])
price = (bid + ask) / 2
if hop_result is not None:
hop_bid = float(hop_result["bidPrice"])
hop_ask = float(hop_result["askPrice"])
hop_price = (hop_bid + hop_ask) / 2
price = hop_price / price
timestamp = int(time.time())
price_int = int(price * (10 ** asset["decimals"]))
pair_id = currency_pair_to_pair_id(*pair)
Expand Down
178 changes: 84 additions & 94 deletions pragma/publisher/fetchers/bybit.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,94 +30,32 @@ async def _fetch_pair(
pair = asset["pair"]
if pair == ("STRK", "USD"):
pair = ("STRK", "USDT")
if pair == ("ETH", "STRK"):
url = f"{self.BASE_URL}symbol=STRKUSDT"
async with session.get(url) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
result = await resp.json()
if result["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
eth_url = f"{self.BASE_URL}symbol=ETHUSDT"
eth_resp = await session.get(eth_url)
eth_result = await eth_resp.json()
return self._construct(
asset,
result,
(
(
float(eth_result["result"]["list"][0]["bid1Price"])
+ float(eth_result["result"]["list"][0]["ask1Price"])
)
)
/ 2,
url = self.format_url(pair[0], pair[1])
async with session.get(url) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
else:
url = f"{self.BASE_URL}symbol={pair[0]}{pair[1]}"
async with session.get(url) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
result = await resp.json()
if result["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)

return self._construct(asset, result)
result = await resp.json()
if result["retCode"] == 10001:
return await self.operate_usdt_hop(asset, session)
return self._construct(asset, result)

def _fetch_pair_sync(
self, asset: PragmaSpotAsset
) -> Union[SpotEntry, PublisherFetchError]:
pair = asset["pair"]
if pair == ("STRK", "USD"):
pair = ("STRK", "USDT")
if pair == ("ETH", "STRK"):
url = f"{self.BASE_URL}symbol=STRKUSDT"
resp = requests.get(url)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
result = resp.json()
if result["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
eth_url = f"{self.BASE_URL}symbol=ETHUSDT"
eth_resp = requests.get(eth_url)
eth_result = eth_resp.json()
return self._construct(
asset,
result,
(
(
float(eth_result["result"]["list"][0]["bid1Price"])
+ float(eth_result["result"]["list"][0]["ask1Price"])
)
)
/ 2,
)
else:
url = f"{self.BASE_URL}symbol={pair[0]}{pair[1]}"

resp = requests.get(url)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)
result = resp.json()
if result["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit"
)

return self._construct(asset, result)
url = self.format_url(pair[0], pair[1])
resp = requests.get(url)
if resp.status_code == 404:
return PublisherFetchError(f"No data found for {'/'.join(pair)} from Bybit")
result = resp.json()
if result["retCode"] == 10001:
return self.operate_usdt_hop_sync(asset)
return self._construct(asset, result)

async def fetch(
self, session: ClientSession
Expand All @@ -140,27 +78,79 @@ def fetch_sync(self) -> List[Union[SpotEntry, PublisherFetchError]]:
return entries

def format_url(self, quote_asset, base_asset):
url = f"{self.BASE_URL}?symbol={quote_asset}/{base_asset}"
url = f"{self.BASE_URL}symbol={quote_asset}{base_asset}"
return url

def _construct(self, asset, result, eth_price=None) -> SpotEntry:
async def operate_usdt_hop(self, asset, session) -> SpotEntry:
pair = asset["pair"]
data = result["result"]["list"]
timestamp = int(time.time())
if pair == ("ETH", "STRK"):
ask = float(data[0]["ask1Price"])
bid = float(data[0]["bid1Price"])
price = (ask + bid) / 2.0
price_int = int((eth_price / price) * (10 ** asset["decimals"]))
else:
ask = float(data[0]["ask1Price"])
bid = float(data[0]["bid1Price"])
price = (ask + bid) / 2.0
price_int = int(price * (10 ** asset["decimals"]))
url_pair1 = self.format_url(asset["pair"][0], "USDT")
async with session.get(url_pair1) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[0]}"
)
pair1_usdt = await resp.json()
if pair1_usdt["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[0]}"
)
url2 = self.format_url(asset["pair"][1], "USDT")
async with session.get(url2) as resp:
if resp.status == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[1]}"
)
pair2_usdt = await resp.json()
if pair2_usdt["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[1]}"
)
return self._construct(asset, pair2_usdt, pair1_usdt)

def operate_usdt_hop_sync(self, asset) -> SpotEntry:
pair = asset["pair"]
url_pair1 = self.format_url(asset["pair"][0], "USDT")
resp = requests.get(url_pair1)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[0]}"
)
pair1_usdt = resp.json()
if pair1_usdt["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[0]}"
)
url_pair2 = self.format_url(asset["pair"][1], "USDT")
resp = requests.get(url_pair2)
if resp.status_code == 404:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[1]}"
)
pair2_usdt = resp.json()
if pair2_usdt["retCode"] == 10001:
return PublisherFetchError(
f"No data found for {'/'.join(pair)} from Bybit - hop failed for {pair[1]}"
)
return self._construct(asset, pair2_usdt, pair1_usdt)

def _construct(self, asset, result, hop_result=None) -> SpotEntry:
pair = asset["pair"]
bid = float(result["result"]["list"][0]["bid1Price"])
ask = float(result["result"]["list"][0]["ask1Price"])
price = (bid + ask) / 2
if hop_result is not None:
hop_bid = float(hop_result["result"]["list"][0]["bid1Price"])
hop_ask = float(hop_result["result"]["list"][0]["ask1Price"])
hop_price = (hop_bid + hop_ask) / 2
price = hop_price / price
timestamp = int(time.time())
price_int = int(price * (10 ** asset["decimals"]))
pair_id = currency_pair_to_pair_id(*pair)
volume = float(data[0]["volume24h"]) / 10 ** asset["decimals"]
volume = (
float(result["result"]["list"][0]["volume24h"]) if hop_result is None else 0
)
logger.info("Fetched price %d for %s from Bybit", price, "/".join(pair))

return SpotEntry(
pair_id=pair_id,
price=price_int,
Expand Down
Loading

0 comments on commit 54c5249

Please sign in to comment.