From e6f13538b886b2de4090d56fdb91a2574cc04d8b Mon Sep 17 00:00:00 2001 From: 0xevolve Date: Tue, 5 Mar 2024 19:31:53 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20deviation=20+=20time=20threshold=20?= =?UTF-8?q?(#85)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: deviation + time threshold * fix: defillama query --- pragma/core/mixins/oracle.py | 39 ++++++++++++- pragma/core/types.py | 29 +++++++++- pragma/publisher/fetchers/bybit.py | 4 +- pragma/publisher/fetchers/defillama.py | 58 +++++-------------- pragma/publisher/fetchers/gecko.py | 37 +++++------- pragma/publisher/fetchers/propeller.py | 3 - pragma/publisher/fetchers/starknetamm.py | 4 +- pragma/publisher/future_fetchers/binance.py | 5 +- pragma/publisher/future_fetchers/bybit.py | 2 +- pragma/publisher/future_fetchers/okx.py | 2 +- pragma/publisher/types.py | 9 ++- pyproject.toml | 2 +- .../starknet_publisher/Dockerfile.dev | 9 +++ .../jobs/publishers/starknet_publisher/app.py | 32 +++++++--- 14 files changed, 145 insertions(+), 90 deletions(-) create mode 100644 stagecoach/jobs/publishers/starknet_publisher/Dockerfile.dev diff --git a/pragma/core/mixins/oracle.py b/pragma/core/mixins/oracle.py index c96ea943..ac4b2d16 100644 --- a/pragma/core/mixins/oracle.py +++ b/pragma/core/mixins/oracle.py @@ -1,7 +1,11 @@ +import asyncio import collections import logging +import time from typing import List, Optional +import aiohttp +import requests from deprecated import deprecated from starknet_py.contract import InvokeResult from starknet_py.net.account.account import Account @@ -9,7 +13,7 @@ from pragma.core.contract import Contract from pragma.core.entry import Entry, FutureEntry, SpotEntry -from pragma.core.types import AggregationMode, DataType, DataTypes +from pragma.core.types import ASSET_MAPPING, AggregationMode, DataType, DataTypes from pragma.core.utils import felt_to_str, str_to_felt logger = logging.getLogger(__name__) @@ -402,3 +406,36 @@ async def update_oracle( max_fee=max_fee, ) return invocation + + async def get_time_since_last_published(self, pair_id, publisher) -> int: + all_entries = await self.get_spot_entries(pair_id) + if len(all_entries) == 0: + return 1000000000 # arbitrary large number + + entries = [ + entry + for entry in all_entries + if entry.base.publisher == str_to_felt(publisher) + ] + max_timestamp = max([entry.base.timestamp for entry in entries]) + + diff = int(time.time()) - max_timestamp + + return diff + + async def get_current_price_deviation(self, pair_id) -> float: + current_data = await self.get_spot(pair_id) + current_price = current_data.price / 10**current_data.decimals + + # query defillama API for the current price + asset = ASSET_MAPPING.get(pair_id.split("/")[0]) + url = f"https://coins.llama.fi/prices/current/coingecko:{asset}" + resp = requests.get( + url, + headers={"Accepts": "application/json"}, + ) + json = resp.json() + price = json["coins"][f"coingecko:{asset}"]["price"] + + deviation = abs(price - current_price) / price + return deviation diff --git a/pragma/core/types.py b/pragma/core/types.py index bc01f5e1..bd1acb8b 100644 --- a/pragma/core/types.py +++ b/pragma/core/types.py @@ -2,7 +2,7 @@ import random from dataclasses import dataclass from enum import Enum, unique -from typing import List, Literal, Optional +from typing import Dict, List, Literal, Optional from starknet_py.net.full_node_client import FullNodeClient @@ -45,6 +45,33 @@ SEPOLIA: 393402133025997798000961, } +ASSET_MAPPING: Dict[str, str] = { + "ETH": "ethereum", + "BTC": "bitcoin", + "WBTC": "wrapped-bitcoin", + "SOL": "solana", + "AVAX": "avalanche-2", + "DOGE": "dogecoin", + "SHIB": "shiba-inu", + "TEMP": "tempus", + "DAI": "dai", + "USDT": "tether", + "USDC": "usd-coin", + "TUSD": "true-usd", + "BUSD": "binance-usd", + "BNB": "binancecoin", + "ADA": "cardano", + "XRP": "ripple", + "MATIC": "matic-network", + "AAVE": "aave", + "R": "r", + "LORDS": "lords", + "WSTETH": "wrapped-steth", + "UNI": "uniswap", + "LUSD": "liquity-usd", + "STRK": "starknet", +} + CHAIN_ID_TO_NETWORK = {v: k for k, v in CHAIN_IDS.items()} STARKSCAN_URLS = { diff --git a/pragma/publisher/fetchers/bybit.py b/pragma/publisher/fetchers/bybit.py index 9b026135..9a179865 100644 --- a/pragma/publisher/fetchers/bybit.py +++ b/pragma/publisher/fetchers/bybit.py @@ -147,7 +147,9 @@ def _construct(self, asset, result, hop_result=None) -> SpotEntry: price_int = int(price * (10 ** asset["decimals"])) pair_id = currency_pair_to_pair_id(*pair) volume = ( - float(result["result"]["list"][0]["volume24h"]) if hop_result is None else 0 + float(result["result"]["list"][0]["volume24h"]) / 10 ** asset["decimals"] + if hop_result is None + else 0 ) logger.info("Fetched price %d for %s from Bybit", price, "/".join(pair)) diff --git a/pragma/publisher/fetchers/defillama.py b/pragma/publisher/fetchers/defillama.py index 1ff10cbf..5135072d 100644 --- a/pragma/publisher/fetchers/defillama.py +++ b/pragma/publisher/fetchers/defillama.py @@ -1,45 +1,18 @@ import asyncio import logging -import os -from typing import Dict, List +from typing import List import requests from aiohttp import ClientSession from pragma.core.assets import PragmaAsset, PragmaSpotAsset from pragma.core.entry import SpotEntry +from pragma.core.types import ASSET_MAPPING from pragma.core.utils import currency_pair_to_pair_id from pragma.publisher.types import PublisherFetchError, PublisherInterfaceT logger = logging.getLogger(__name__) -ASSET_MAPPING: Dict[str, str] = { - "ETH": "ethereum", - "BTC": "bitcoin", - "WBTC": "wrapped-bitcoin", - "SOL": "solana", - "AVAX": "avalanche-2", - "DOGE": "dogecoin", - "SHIB": "shiba-inu", - "TEMP": "tempus", - "DAI": "dai", - "USDT": "tether", - "USDC": "usd-coin", - "TUSD": "true-usd", - "BUSD": "binance-usd", - "BNB": "binancecoin", - "ADA": "cardano", - "XRP": "ripple", - "MATIC": "matic-network", - "AAVE": "aave", - "R": "r", - "LORDS": "lords", - "WSTETH": "wrapped-steth", - "UNI": "uniswap", - "LUSD": "liquity-usd", - "STRK": "starknet", -} - class DefillamaFetcher(PublisherInterfaceT): BASE_URL: str = ( @@ -69,7 +42,7 @@ async def _fetch_pair( return PublisherFetchError( f"Unknown price pair, do not know how to query Coingecko for {pair[0]}" ) - if pair[1] != "USD": + if pair[1] != "USD": return await self.operate_usd_hop(asset, session) url = self.BASE_URL.format(pair_id=pair_id) @@ -84,9 +57,7 @@ async def _fetch_pair( f"No data found for {'/'.join(pair)} from Defillama" ) - return self._construct( - asset=asset, result=result - ) + return self._construct(asset=asset, result=result) def _fetch_pair_sync(self, asset: PragmaSpotAsset) -> SpotEntry: pair = asset["pair"] @@ -96,7 +67,7 @@ def _fetch_pair_sync(self, asset: PragmaSpotAsset) -> SpotEntry: f"Unknown price pair, do not know how to query Coingecko for {pair[0]}" ) if pair[1] != "USD": - return self.operate_usd_hop_sync(asset) + return self.operate_usd_hop_sync(asset) url = self.BASE_URL.format(pair_id=pair_id) resp = requests.get(url, headers=self.headers) if resp.status_code == 404: @@ -132,7 +103,7 @@ def format_url(self, quote_asset, base_asset): pair_id = ASSET_MAPPING.get(quote_asset) url = self.BASE_URL.format(pair_id=pair_id) return url - + async def operate_usd_hop(self, asset, session) -> SpotEntry: pair = asset["pair"] pair_id_1 = ASSET_MAPPING.get(pair[0]) @@ -158,14 +129,13 @@ async def operate_usd_hop(self, asset, session) -> SpotEntry: return PublisherFetchError( f"No data found for {'/'.join(pair)} from Defillama - usd hop failed for {pair[1]}" ) - result_quote= await resp.json() - print(result_quote) + result_quote = await resp.json() if not result_quote["coins"]: return PublisherFetchError( f"No data found for {'/'.join(pair)} from Defillama - usd hop failed for {pair[1]}" ) - return self._construct(asset, result_base,result_quote) - + return self._construct(asset, result_base, result_quote) + def operate_usd_hop_sync(self, asset) -> SpotEntry: pair = asset["pair"] pair_id_1 = ASSET_MAPPING.get(pair[0]) @@ -196,19 +166,19 @@ def operate_usd_hop_sync(self, asset) -> SpotEntry: return PublisherFetchError( f"No data found for {'/'.join(pair)} from Defillama - usd hop failed for {pair[1]}" ) - return self._construct(asset, result_base,result_quote) + return self._construct(asset, result_base, result_quote) - def _construct(self, asset, result, hop_result = None) -> SpotEntry: + def _construct(self, asset, result, hop_result=None) -> SpotEntry: pair = asset["pair"] - base_id= ASSET_MAPPING.get(pair[0]) + base_id = ASSET_MAPPING.get(pair[0]) quote_id = ASSET_MAPPING.get(pair[1]) pair_id = currency_pair_to_pair_id(*pair) timestamp = int(result["coins"][f"coingecko:{base_id}"]["timestamp"]) - if hop_result is not None: + if hop_result is not None: price = result["coins"][f"coingecko:{base_id}"]["price"] hop_price = hop_result["coins"][f"coingecko:{quote_id}"]["price"] price_int = int((price / hop_price) * (10 ** asset["decimals"])) - else: + else: price = result["coins"][f"coingecko:{base_id}"]["price"] price_int = int(price * (10 ** asset["decimals"])) diff --git a/pragma/publisher/fetchers/gecko.py b/pragma/publisher/fetchers/gecko.py index aca25b4e..b557347b 100644 --- a/pragma/publisher/fetchers/gecko.py +++ b/pragma/publisher/fetchers/gecko.py @@ -53,7 +53,7 @@ async def _fetch_pair( self, asset: PragmaSpotAsset, session: ClientSession ) -> SpotEntry: pair = asset["pair"] - if pair[1] != "USD" : + if pair[1] != "USD": return await self.operate_usd_hop(asset, session) pool = ASSET_MAPPING.get(pair[0]) if pool is None: @@ -81,8 +81,8 @@ async def _fetch_pair( def _fetch_pair_sync(self, asset: PragmaSpotAsset) -> SpotEntry: pair = asset["pair"] - if pair[1] != "USD" : - return self.operate_usd_hop_sync(asset) + if pair[1] != "USD": + return self.operate_usd_hop_sync(asset) pool = ASSET_MAPPING.get(pair[0]) if pool is None: return PublisherFetchError( @@ -106,7 +106,7 @@ def _fetch_pair_sync(self, asset: PragmaSpotAsset) -> SpotEntry: ) return self._construct(asset, result) - + async def fetch(self, session: ClientSession) -> List[SpotEntry]: entries = [] for asset in self.assets: @@ -129,7 +129,7 @@ def format_url(self, quote_asset, base_asset): pool = ASSET_MAPPING[quote_asset] url = self.BASE_URL.format(network=pool[0], token_address=pool[1]) return url - + async def operate_usd_hop(self, asset, session) -> SpotEntry: pair = asset["pair"] pool_1 = ASSET_MAPPING.get(pair[0]) @@ -154,9 +154,7 @@ async def operate_usd_hop(self, asset, session) -> SpotEntry: f"No data found for {'/'.join(pair)} from GeckoTerminal" ) - pair2_url = self.BASE_URL.format( - network=pool_2[0], token_address=pool_2[1] - ) + pair2_url = self.BASE_URL.format(network=pool_2[0], token_address=pool_2[1]) async with session.get(pair2_url, headers=self.headers) as resp2: if resp.status == 404: return PublisherFetchError( @@ -196,15 +194,13 @@ def operate_usd_hop_sync(self, asset) -> SpotEntry: f"No data found for {'/'.join(pair)} from GeckoTerminal" ) - pair2_url = self.BASE_URL.format( - network=pool_2[0], token_address=pool_2[1] - ) + pair2_url = self.BASE_URL.format(network=pool_2[0], token_address=pool_2[1]) resp2 = requests.get(pair2_url, headers=self.headers) if resp.status_code == 404: - return PublisherFetchError( - f"No data found for {'/'.join(pair)} from GeckoTerminal" - ) - hop_result = resp2.json() + return PublisherFetchError( + f"No data found for {'/'.join(pair)} from GeckoTerminal" + ) + hop_result = resp2.json() if ( result.get("errors") is not None and result["errors"][0]["title"] == "Not Found" @@ -214,16 +210,15 @@ def operate_usd_hop_sync(self, asset) -> SpotEntry: ) return self._construct(asset, hop_result, result) - - - def _construct(self, asset, result, hop_result = None) -> SpotEntry: + + def _construct(self, asset, result, hop_result=None) -> SpotEntry: pair = asset["pair"] data = result["data"]["attributes"] price = float(data["price_usd"]) - if hop_result is not None: + if hop_result is not None: hop_price = float(hop_result["data"]["attributes"]["price_usd"]) price_int = int(hop_price / price * 10 ** asset["decimals"]) - else: + else: price_int = int(price * (10 ** asset["decimals"])) volume = float(data["volume_usd"]["h24"]) / 10 ** asset["decimals"] @@ -241,5 +236,3 @@ def _construct(self, asset, result, hop_result = None) -> SpotEntry: publisher=self.publisher, volume=volume, ) - - diff --git a/pragma/publisher/fetchers/propeller.py b/pragma/publisher/fetchers/propeller.py index d471451a..f942ed8c 100644 --- a/pragma/publisher/fetchers/propeller.py +++ b/pragma/publisher/fetchers/propeller.py @@ -87,10 +87,7 @@ async def _fetch_pair( except PublisherFetchError as e: return e - print(payload) - async with session.post(url, headers=self.headers, json=payload) as resp: - print(resp) if resp.status == 404: return PublisherFetchError( f"No data found for {'/'.join(pair)} from Propeller" diff --git a/pragma/publisher/fetchers/starknetamm.py b/pragma/publisher/fetchers/starknetamm.py index b648982d..3f5d372c 100644 --- a/pragma/publisher/fetchers/starknetamm.py +++ b/pragma/publisher/fetchers/starknetamm.py @@ -15,9 +15,8 @@ # from starknet_py.net.full_node_client import FullNodeClient from pragma.core.client import PragmaClient from pragma.core.entry import SpotEntry -from pragma.core.types import PoolKey, get_client_from_network, get_rpc_url +from pragma.core.types import PoolKey from pragma.core.utils import currency_pair_to_pair_id, str_to_felt -from pragma.publisher.fetchers.defillama import DefillamaFetcher from pragma.publisher.types import PublisherFetchError, PublisherInterfaceT load_dotenv() @@ -218,4 +217,5 @@ def _construct(self, asset, result) -> SpotEntry: timestamp=int(time.time()), source=self.SOURCE, publisher=self.publisher, + volume=0, ) diff --git a/pragma/publisher/future_fetchers/binance.py b/pragma/publisher/future_fetchers/binance.py index 461698b8..57bc6e86 100644 --- a/pragma/publisher/future_fetchers/binance.py +++ b/pragma/publisher/future_fetchers/binance.py @@ -149,7 +149,10 @@ def _construct(self, asset, result, volume_arr) -> List[FutureEntry]: price = float(data["markPrice"]) price_int = int(price * (10 ** asset["decimals"])) pair_id = currency_pair_to_pair_id(*pair) - volume = float(self.retrieve_volume(data["symbol"], volume_arr)) + volume = ( + float(self.retrieve_volume(data["symbol"], volume_arr)) + / 10 ** asset["decimals"] + ) if data["symbol"] == f"{pair[0]}{pair[1]}": expiry_timestamp = 0 else: diff --git a/pragma/publisher/future_fetchers/bybit.py b/pragma/publisher/future_fetchers/bybit.py index 88ac7348..964a1f32 100644 --- a/pragma/publisher/future_fetchers/bybit.py +++ b/pragma/publisher/future_fetchers/bybit.py @@ -107,7 +107,7 @@ def _construct(self, asset, result) -> FutureEntry: price = float(data["lastPrice"]) price_int = int(price * (10 ** asset["decimals"])) pair_id = currency_pair_to_pair_id(*pair) - volume = float(data["volume24h"]) + volume = float(data["volume24h"]) / 10 ** asset["decimals"] expiry_timestamp = int(data["deliveryTime"]) logger.info("Fetched future for %s from BYBIT", ("/".join(pair))) diff --git a/pragma/publisher/future_fetchers/okx.py b/pragma/publisher/future_fetchers/okx.py index 2653a784..e8a009ca 100644 --- a/pragma/publisher/future_fetchers/okx.py +++ b/pragma/publisher/future_fetchers/okx.py @@ -156,7 +156,7 @@ def _construct(self, asset, data, expiry_timestamp) -> List[FutureEntry]: price = float(data["last"]) price_int = int(price * (10 ** asset["decimals"])) pair_id = currency_pair_to_pair_id(*pair) - volume = float(data["volCcy24h"]) + volume = float(data["volCcy24h"]) / 10 ** asset["decimals"] logger.info("Fetched future for %s from OKX", "/".join(pair)) return FutureEntry( diff --git a/pragma/publisher/types.py b/pragma/publisher/types.py index 81cb5f88..3926dbf5 100644 --- a/pragma/publisher/types.py +++ b/pragma/publisher/types.py @@ -8,13 +8,16 @@ # Abstract base class for all publishers class PublisherInterfaceT(abc.ABC): @abc.abstractmethod - async def fetch(self, session: ClientSession) -> List[Any]: ... + async def fetch(self, session: ClientSession) -> List[Any]: + ... @abc.abstractmethod - def fetch_sync(self) -> List[Any]: ... + def fetch_sync(self) -> List[Any]: + ... @abc.abstractmethod - def format_url(self, quote_asset, base_asset) -> str: ... + def format_url(self, quote_asset, base_asset) -> str: + ... async def _fetch(self): async with aiohttp.ClientSession() as session: diff --git a/pyproject.toml b/pyproject.toml index 402e1906..374013a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pragma-sdk" -version = "1.2.5" +version = "1.2.6" authors = ["0xevolve "] description = "Core package for rollup-native Pragma Oracle" readme = "README.md" diff --git a/stagecoach/jobs/publishers/starknet_publisher/Dockerfile.dev b/stagecoach/jobs/publishers/starknet_publisher/Dockerfile.dev new file mode 100644 index 00000000..6c305365 --- /dev/null +++ b/stagecoach/jobs/publishers/starknet_publisher/Dockerfile.dev @@ -0,0 +1,9 @@ +FROM public.ecr.aws/docker/library/python:3.9 + +RUN python -m pip install --upgrade pip + +RUN pip install pragma-sdk boto3 + +COPY app.py . + +CMD ["python", "app.py"] diff --git a/stagecoach/jobs/publishers/starknet_publisher/app.py b/stagecoach/jobs/publishers/starknet_publisher/app.py index dd76fcf8..08a43fda 100644 --- a/stagecoach/jobs/publishers/starknet_publisher/app.py +++ b/stagecoach/jobs/publishers/starknet_publisher/app.py @@ -11,13 +11,13 @@ from pragma.core.logger import get_stream_logger from pragma.publisher.client import PragmaPublisherClient from pragma.publisher.fetchers import ( - AscendexFetcher, + BinanceFetcher, BitstampFetcher, - CexFetcher, - CoinbaseFetcher, + BybitFetcher, DefillamaFetcher, GeckoTerminalFetcher, - KaikoFetcher, + HuobiFetcher, + KucoinFetcher, OkxFetcher, PropellerFetcher, StarknetAMMFetcher, @@ -36,11 +36,14 @@ FUTURE_ASSETS = os.environ["FUTURE_ASSETS"] PUBLISHER = os.environ.get("PUBLISHER") PUBLISHER_ADDRESS = int(os.environ.get("PUBLISHER_ADDRESS"), 16) -KAIKO_API_KEY = os.environ.get("KAIKO_API_KEY") PROPELLER_API_KEY = os.environ.get("PROPELLER_API_KEY") PAGINATION = os.environ.get("PAGINATION") RPC_URL = os.environ.get("RPC_URL") MAX_FEE = int(os.getenv("MAX_FEE", int(1e17))) + +DEVIATION_THRESHOLD = float(os.getenv("DEVIATION_THRESHOLD", 0.01)) +FREQUENCY_SECONDS = int(os.getenv("FREQUENCY_SECONDS", 60)) + if PAGINATION is not None: PAGINATION = int(PAGINATION) @@ -92,18 +95,30 @@ async def _handler(assets): chain_name=os.getenv("NETWORK"), ) + last_publish = await publisher_client.get_time_since_last_published( + "ETH/USD", PUBLISHER + ) + deviation = await publisher_client.get_current_price_deviation("ETH/USD") + print(f"Last publish was {last_publish} seconds ago and deviation is {deviation}") + if last_publish < FREQUENCY_SECONDS and deviation < DEVIATION_THRESHOLD: + print( + f"Last publish was {last_publish} seconds ago and deviation is {deviation}, skipping publish" + ) + return [] + publisher_client.add_fetchers( [ fetcher(assets, PUBLISHER) for fetcher in ( BitstampFetcher, - CexFetcher, - CoinbaseFetcher, - AscendexFetcher, DefillamaFetcher, OkxFetcher, GeckoTerminalFetcher, StarknetAMMFetcher, + HuobiFetcher, + KucoinFetcher, + BybitFetcher, + BinanceFetcher, BinanceFutureFetcher, OkxFutureFetcher, ByBitFutureFetcher, @@ -112,7 +127,6 @@ async def _handler(assets): ) publisher_client.add_fetcher(PropellerFetcher(assets, PUBLISHER, PROPELLER_API_KEY)) - publisher_client.add_fetcher(KaikoFetcher(assets, PUBLISHER, KAIKO_API_KEY)) _entries = await publisher_client.fetch() print(_entries)