From 5ee1aaba37704d702b676e0a34223ce53f030ab2 Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:31:59 -0800 Subject: [PATCH] feat: astroport provider --- config/supported_assets.go | 2 + oracle/oracle.go | 4 +- oracle/provider/astroport.go | 290 +++++++++++++++++++++++++++++++++++ 3 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 oracle/provider/astroport.go diff --git a/config/supported_assets.go b/config/supported_assets.go index 5ac650da..2b15db4c 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -26,6 +26,7 @@ var ( provider.ProviderPolygon: true, provider.ProviderEthUniswap: false, provider.ProviderKujira: false, + provider.ProviderAstroport: false, provider.ProviderMock: false, } @@ -45,6 +46,7 @@ var ( {Base: "JUNO", Quote: "USDT"}: {}, {Base: "WETH", Quote: "USDC"}: {}, {Base: "WBTC", Quote: "WETH"}: {}, + {Base: "INJ", Quote: "USD"}: {}, } SupportedUniswapCurrencies = map[string]struct{}{ diff --git a/oracle/oracle.go b/oracle/oracle.go index a225a9c9..062c15e8 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -331,7 +331,6 @@ func (o *Oracle) GetComputedPrices( providerCandles types.AggregatedProviderCandles, providerPrices types.AggregatedProviderPrices, ) (types.CurrencyPairDec, error) { - conversionRates, err := CalcCurrencyPairRates( providerCandles, providerPrices, @@ -474,6 +473,9 @@ func NewProvider( case provider.ProviderEthUniswap: return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderAstroport: + return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...) } return nil, fmt.Errorf("provider %s not found", providerName) diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go new file mode 100644 index 00000000..2c57054e --- /dev/null +++ b/oracle/provider/astroport.go @@ -0,0 +1,290 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/ojo-network/ojo/util/decmath" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +var _ Provider = (*AstroportProvider)(nil) + +const ( + ProviderAstroport = "astroport" + restURL = "https://markets-api.astroport.fi" + tickersURL = "/markets/cg/tickers" + assetsURL = "/markets/cmc/v1/assets" + pollInterval = 3 * time.Second +) + +type ( + AstroportProvider struct { + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + client *http.Client + priceStore + } + + AstroportAssetResponse struct { + BaseID string `json:"base_id"` + BaseName string `json:"base_name"` + BaseSymbol string `json:"base_symbol"` + QuoteID string `json:"quote_id"` + QuoteName string `json:"quote_name"` + QuoteSymbol string `json:"quote_symbol"` + LastPrice float64 `json:"last_price"` + BaseVolume float64 `json:"base_volume"` + QuoteVolume float64 `json:"quote_volume"` + USDVolume float64 `json:"USD_volume"` + } + + AstroportTickersResponse struct { + TickerID string `json:"ticker_id"` + BaseCurrency string `json:"base_currency"` + TargetCurrency string `json:"target_currency"` + LastPrice float64 `json:"last_price"` + LiquidityInUSD float64 `json:"liquidity_in_usd"` + BaseVolume float64 `json:"base_volume"` + TargetVolume float64 `json:"target_volume"` + PoolID string `json:"pool_id"` + } + + AstroportTickerPairs struct { + ticker AstroportTickersResponse + pair types.CurrencyPair + } +) + +func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { + lp, err := decmath.NewDecFromFloat(atr.LastPrice) + if err != nil { + return types.TickerPrice{}, err + } + volume, err := decmath.NewDecFromFloat(atr.BaseVolume) + if err != nil { + return types.TickerPrice{}, err + } + return types.TickerPrice{ + Price: lp, + Volume: volume, + }, nil +} + +func NewAstroportProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*AstroportProvider, error) { + if (endpoints.Name) != ProviderAstroport { + endpoints = Endpoint{ + Name: ProviderAstroport, + Rest: restURL, + } + } + + astroLogger := logger.With().Str("provider", string(ProviderAstroport)).Logger() + + provider := &AstroportProvider{ + logger: astroLogger, + endpoints: endpoints, + priceStore: newPriceStore(astroLogger), + client: &http.Client{}, + } + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + go func() { + logger.Debug().Msg("starting ftx polling...") + err := provider.pollCache(ctx, pairs...) + if err != nil { + logger.Err(err).Msg("astroport provider unable to poll new data") + } + }() + + provider.setSubscribedPairs(confirmedPairs...) + + return provider, nil +} + +// GetTickerPrices returns the tickerPrices based on the provided pairs. +func (p AstroportProvider) setTickers() error { + tickers, err := p.queryTickers() + if err != nil { + return err + } + for _, v := range tickers { + p.setTickerPair(v.ticker, v.pair.String()) + } + return nil +} + +func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { + queryingPairs := p.subscribedPairs + _, pairToTickerIDMap, err := p.getTickerMaps() + if err != nil { + return nil, err + } + + // map of ticker IDs -> pairs + tickerIDs := make(map[string]types.CurrencyPair, len(queryingPairs)) + for _, pair := range queryingPairs { + if tickerID, ok := pairToTickerIDMap[pair.String()]; ok { + tickerIDs[tickerID] = pair + } + } + return tickerIDs, nil +} + +func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { + res, err := p.client.Get(p.endpoints.Rest + assetsURL) + if err != nil { + return nil, nil, err + } + defer res.Body.Close() + + bz, err := io.ReadAll(res.Body) + if err != nil { + return nil, nil, fmt.Errorf("failed to read response: %w", err) + } + + astroportAssets := []map[string]AstroportAssetResponse{} + if err := json.Unmarshal(bz, &astroportAssets); err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + availablePairs := map[string]types.CurrencyPair{} + for _, assetMap := range astroportAssets { + for tickerID, asset := range assetMap { + availablePairs[tickerID] = types.CurrencyPair{ + Base: strings.ToUpper(asset.BaseSymbol), + Quote: strings.ToUpper(asset.QuoteSymbol), + } + } + } + + pairToTickerID := map[string]string{} + for tickerID, pair := range availablePairs { + pairToTickerID[pair.String()] = tickerID + } + + return availablePairs, pairToTickerID, nil +} + +func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { + res, err := p.client.Get(p.endpoints.Rest + tickersURL) + if err != nil { + return nil, err + } + defer res.Body.Close() + + bz, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + astroportTickers := []AstroportTickersResponse{} + if err := json.Unmarshal(bz, &astroportTickers); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + tickerMap, err := p.findTickersForPairs() + if err != nil { + return nil, err + } + tickers := []AstroportTickerPairs{} + for tickerID, v := range tickerMap { + for _, ticker := range astroportTickers { + if ticker.TickerID == tickerID { + tickers = append(tickers, AstroportTickerPairs{ + ticker: ticker, + pair: v, + }) + } + } + } + + return tickers, nil +} + +// GetAvailablePairs return all available pairs symbol to subscribe. +func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { + availablePairs, _, err := p.getTickerMaps() + if err != nil { + return nil, err + } + + availableSymbols := map[string]struct{}{} + for _, pair := range availablePairs { + availableSymbols[pair.String()] = struct{}{} + } + + return availableSymbols, nil +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newPairs := []types.CurrencyPair{} + for _, cp := range cps { + if _, ok := p.subscribedPairs[cp.String()]; !ok { + newPairs = append(newPairs, cp) + } + } + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + newPairs..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +// This function periodically calls setTickers to update the priceStore. +func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error { + for { + select { + case <-ctx.Done(): + return nil + + default: + p.logger.Debug().Msg("querying astroport api") + + err := p.setTickers() + if err != nil { + return err + } + + time.Sleep(pollInterval) + } + } +} + +// StartConnections starts the websocket connections. +func (p AstroportProvider) StartConnections() {}