From 671b42933621167227012ef19059cca0a6f20616 Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Mon, 27 Nov 2023 16:12:15 -0800 Subject: [PATCH] feat: astroport provider (#316) --- config/supported_assets.go | 3 + ojo-provider-config/currency-pairs.toml | 16 ++ oracle/oracle.go | 4 +- oracle/provider/astroport.go | 283 ++++++++++++++++++++++++ oracle/provider/astroport_test.go | 41 ++++ 5 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 oracle/provider/astroport.go create mode 100644 oracle/provider/astroport_test.go diff --git a/config/supported_assets.go b/config/supported_assets.go index 5ac650da..160f83c3 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -26,6 +26,7 @@ var ( provider.ProviderPolygon: true, provider.ProviderEthUniswap: false, provider.ProviderKujira: false, + provider.ProviderAstroport: false, provider.ProviderMock: false, } @@ -40,11 +41,13 @@ var ( {Base: "ETH", Quote: "USD"}: {}, {Base: "ATOM", Quote: "USD"}: {}, {Base: "OSMO", Quote: "USD"}: {}, + {Base: "INJ", Quote: "USD"}: {}, {Base: "OSMO", Quote: "USDT"}: {}, {Base: "JUNO", Quote: "USDT"}: {}, {Base: "WETH", Quote: "USDC"}: {}, {Base: "WBTC", Quote: "WETH"}: {}, + {Base: "INJ", Quote: "USDT"}: {}, } SupportedUniswapCurrencies = map[string]struct{}{ diff --git a/ojo-provider-config/currency-pairs.toml b/ojo-provider-config/currency-pairs.toml index ef7a940a..982689e4 100644 --- a/ojo-provider-config/currency-pairs.toml +++ b/ojo-provider-config/currency-pairs.toml @@ -217,3 +217,19 @@ providers = [ "gate" ] quote = "USDT" + +[[currency_pairs]] +base = "INJ" +providers = [ + "gate", + "binance", + "mexc", +] +quote = "USDT" + +[[currency_pairs]] +base = "STINJ" +providers = [ + "astroport", +] +quote = "INJ" diff --git a/oracle/oracle.go b/oracle/oracle.go index a225a9c9..062c15e8 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -331,7 +331,6 @@ func (o *Oracle) GetComputedPrices( providerCandles types.AggregatedProviderCandles, providerPrices types.AggregatedProviderPrices, ) (types.CurrencyPairDec, error) { - conversionRates, err := CalcCurrencyPairRates( providerCandles, providerPrices, @@ -474,6 +473,9 @@ func NewProvider( case provider.ProviderEthUniswap: return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderAstroport: + return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...) } return nil, fmt.Errorf("provider %s not found", providerName) diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go new file mode 100644 index 00000000..c70138ea --- /dev/null +++ b/oracle/provider/astroport.go @@ -0,0 +1,283 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/ojo-network/ojo/util/decmath" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +var _ Provider = (*AstroportProvider)(nil) + +const ( + ProviderAstroport = "astroport" + restURL = "https://markets-api.astroport.fi" + tickersURL = "/markets/cg/tickers" + assetsURL = "/markets/cmc/v1/assets" + pollInterval = 3 * time.Second +) + +type ( + AstroportProvider struct { + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + client *http.Client + priceStore + ctx context.Context + } + + // AstroportAssetResponse is the response from the Astroport assets endpoint. + AstroportAssetResponse struct { + BaseID string `json:"base_id"` + BaseName string `json:"base_name"` + BaseSymbol string `json:"base_symbol"` + QuoteID string `json:"quote_id"` + QuoteName string `json:"quote_name"` + QuoteSymbol string `json:"quote_symbol"` + LastPrice float64 `json:"last_price"` + BaseVolume float64 `json:"base_volume"` + QuoteVolume float64 `json:"quote_volume"` + USDVolume float64 `json:"USD_volume"` + } + // AstroportTickersResponse is the response from the Astroport tickers endpoint. + AstroportTickersResponse struct { + TickerID string `json:"ticker_id"` + BaseCurrency string `json:"base_currency"` + TargetCurrency string `json:"target_currency"` + LastPrice float64 `json:"last_price"` + LiquidityInUSD float64 `json:"liquidity_in_usd"` + BaseVolume float64 `json:"base_volume"` + TargetVolume float64 `json:"target_volume"` + PoolID string `json:"pool_id"` + } +) + +// NewAstroportProvider returns a new AstroportProvider. +// It also starts a go routine to poll for new data. +func NewAstroportProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*AstroportProvider, error) { + if (endpoints.Name) != ProviderAstroport { + endpoints = Endpoint{ + Name: ProviderAstroport, + Rest: restURL, + } + } + + astroLogger := logger.With().Str("provider", string(ProviderAstroport)).Logger() + + provider := &AstroportProvider{ + logger: astroLogger, + endpoints: endpoints, + priceStore: newPriceStore(astroLogger), + client: &http.Client{}, + ctx: ctx, + } + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + return provider, nil +} + +// GetAvailablePairs return all available pair symbols. +func (p *AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { + availablePairs, err := p.getAvailableAssets() + if err != nil { + return nil, err + } + + availableSymbols := map[string]struct{}{} + for _, pair := range availablePairs { + availableSymbols[pair.String()] = struct{}{} + } + + return availableSymbols, nil +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array. +func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newPairs := []types.CurrencyPair{} + for _, cp := range cps { + if _, ok := p.subscribedPairs[cp.String()]; !ok { + newPairs = append(newPairs, cp) + } + } + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + newPairs..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +// StartConnections begins the polling process for +// the astroport provider. +func (p *AstroportProvider) StartConnections() { + go func() { + p.logger.Debug().Msg("starting astroport polling...") + err := p.poll() + if err != nil { + p.logger.Err(err).Msg("astroport provider unable to poll new data") + } + }() +} + +// AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the +// corresponding pair. It satisfies the TickerPrice interface. +type AstroportTickerPairs struct { + ticker AstroportTickersResponse + pair types.CurrencyPair +} + +// toTickerPrice converts the AstroportTickerPairs to a TickerPrice. +// It satisfies the TickerPrice interface. +func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { + lp, err := decmath.NewDecFromFloat(atr.LastPrice) + if err != nil { + return types.TickerPrice{}, err + } + volume, err := decmath.NewDecFromFloat(atr.BaseVolume) + if err != nil { + return types.TickerPrice{}, err + } + return types.TickerPrice{ + Price: lp, + Volume: volume, + }, nil +} + +// setTickers queries the Astroport API for the latest tickers and updates the +// priceStore. +func (p *AstroportProvider) setTickers() error { + tickers, err := p.queryTickers() + if err != nil { + return err + } + for _, v := range tickers { + p.setTickerPair(v.ticker, v.pair.String()) + } + return nil +} + +// getAvailableAssets returns all available assets from the api. +// It returns a map of ticker IDs -> pairs. +func (p *AstroportProvider) getAvailableAssets() (map[string]types.CurrencyPair, error) { + res, err := p.client.Get(p.endpoints.Rest + assetsURL) + if err != nil { + return nil, err + } + defer res.Body.Close() + + bz, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + astroportAssets := []map[string]AstroportAssetResponse{} + if err := json.Unmarshal(bz, &astroportAssets); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + // convert the astroport assets to a map of ticker IDs -> pairs + availablePairs := map[string]types.CurrencyPair{} + for _, assetMap := range astroportAssets { + for tickerID, asset := range assetMap { + availablePairs[tickerID] = types.CurrencyPair{ + Base: strings.ToUpper(asset.BaseSymbol), + Quote: strings.ToUpper(asset.QuoteSymbol), + } + } + } + return availablePairs, nil +} + +// queryTickers returns the AstroportTickerPairs available from the API. +func (p *AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { + res, err := p.client.Get(p.endpoints.Rest + tickersURL) + if err != nil { + return nil, err + } + defer res.Body.Close() + + bz, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + astroportTickers := []AstroportTickersResponse{} + if err := json.Unmarshal(bz, &astroportTickers); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + availableAssets, err := p.getAvailableAssets() + if err != nil { + return nil, err + } + + // filter out tickers that we are not subscribed to + tickers := []AstroportTickerPairs{} + for tickerID, v := range availableAssets { + for _, ticker := range astroportTickers { + if ticker.TickerID == tickerID { + tickers = append(tickers, AstroportTickerPairs{ + ticker: ticker, + pair: v, + }) + } + } + } + return tickers, nil +} + +// This function periodically calls setTickers to update the priceStore. +func (p *AstroportProvider) poll() error { + for { + select { + case <-p.ctx.Done(): + return nil + + default: + p.logger.Debug().Msg("querying astroport api") + + err := p.setTickers() + if err != nil { + return err + } + + time.Sleep(pollInterval) + } + } +} diff --git a/oracle/provider/astroport_test.go b/oracle/provider/astroport_test.go new file mode 100644 index 00000000..27010067 --- /dev/null +++ b/oracle/provider/astroport_test.go @@ -0,0 +1,41 @@ +package provider + +import ( + "context" + "os" + "testing" + "time" + + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +// TestAstroportProvider_GetTickers tests the polling process. +// TODO: Make this more comprehensive. +// +// Ref: https://github.com/ojo-network/price-feeder/issues/317 +func TestAstroportProvider_GetTickers(t *testing.T) { + ctx := context.Background() + pairs := []types.CurrencyPair{{ + Base: "STINJ", + Quote: "INJ", + }} + p, err := NewAstroportProvider( + ctx, + zerolog.New(os.Stdout).With().Timestamp().Logger(), + Endpoint{}, + pairs..., + ) + require.NoError(t, err) + availPairs, err := p.GetAvailablePairs() + require.NoError(t, err) + require.NotEmpty(t, availPairs) + + p.StartConnections() + time.Sleep(2 * time.Second) + + res, err := p.GetTickerPrices(pairs...) + require.NoError(t, err) + require.NotEmpty(t, res) +}