diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go index 2c57054..57e7ec3 100644 --- a/oracle/provider/astroport.go +++ b/oracle/provider/astroport.go @@ -35,6 +35,7 @@ type ( priceStore } + // AstroportAssetResponse is the response from the Astroport assets endpoint. AstroportAssetResponse struct { BaseID string `json:"base_id"` BaseName string `json:"base_name"` @@ -47,7 +48,7 @@ type ( QuoteVolume float64 `json:"quote_volume"` USDVolume float64 `json:"USD_volume"` } - + // AstroportTickersResponse is the response from the Astroport tickers endpoint. AstroportTickersResponse struct { TickerID string `json:"ticker_id"` BaseCurrency string `json:"base_currency"` @@ -58,28 +59,10 @@ type ( TargetVolume float64 `json:"target_volume"` PoolID string `json:"pool_id"` } - - AstroportTickerPairs struct { - ticker AstroportTickersResponse - pair types.CurrencyPair - } ) -func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { - lp, err := decmath.NewDecFromFloat(atr.LastPrice) - if err != nil { - return types.TickerPrice{}, err - } - volume, err := decmath.NewDecFromFloat(atr.BaseVolume) - if err != nil { - return types.TickerPrice{}, err - } - return types.TickerPrice{ - Price: lp, - Volume: volume, - }, nil -} - +// NewAstroportProvider returns a new AstroportProvider. +// It also starts a go routine to poll for new data. func NewAstroportProvider( ctx context.Context, logger zerolog.Logger, @@ -125,7 +108,77 @@ func NewAstroportProvider( return provider, nil } -// GetTickerPrices returns the tickerPrices based on the provided pairs. +// GetAvailablePairs return all available pair symbols. +func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { + availablePairs, _, err := p.getTickerMaps() + if err != nil { + return nil, err + } + + availableSymbols := map[string]struct{}{} + for _, pair := range availablePairs { + availableSymbols[pair.String()] = struct{}{} + } + + return availableSymbols, nil +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array. +func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newPairs := []types.CurrencyPair{} + for _, cp := range cps { + if _, ok := p.subscribedPairs[cp.String()]; !ok { + newPairs = append(newPairs, cp) + } + } + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + newPairs..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +// StartConnections starts the websocket connections. +// This function is a no-op for the astroport provider. +func (p AstroportProvider) StartConnections() {} + +// AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the +// corresponding pair. It satisfies the TickerPrice interface. +type AstroportTickerPairs struct { + ticker AstroportTickersResponse + pair types.CurrencyPair +} + +// toTickerPrice converts the AstroportTickerPairs to a TickerPrice. +// It satisfies the TickerPrice interface. +func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { + lp, err := decmath.NewDecFromFloat(atr.LastPrice) + if err != nil { + return types.TickerPrice{}, err + } + volume, err := decmath.NewDecFromFloat(atr.BaseVolume) + if err != nil { + return types.TickerPrice{}, err + } + return types.TickerPrice{ + Price: lp, + Volume: volume, + }, nil +} + +// setTickers queries the Astroport API for the latest tickers and updates the +// priceStore. func (p AstroportProvider) setTickers() error { tickers, err := p.queryTickers() if err != nil { @@ -137,6 +190,8 @@ func (p AstroportProvider) setTickers() error { return nil } +// findTickersForPairs returns a map of ticker IDs -> pairs, but filters out +// pairs that we are not subscribed to. func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { queryingPairs := p.subscribedPairs _, pairToTickerIDMap, err := p.getTickerMaps() @@ -154,6 +209,8 @@ func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, return tickerIDs, nil } +// getTickerMaps returns all available assets from the api. +// It returns a map of ticker IDs -> pairs and a map of pairs -> ticker IDs. func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { res, err := p.client.Get(p.endpoints.Rest + assetsURL) if err != nil { @@ -189,6 +246,7 @@ func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[s return availablePairs, pairToTickerID, nil } +// queryTickers returns the AstroportTickerPairs available from the API. func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { res, err := p.client.Get(p.endpoints.Rest + tickersURL) if err != nil { @@ -210,6 +268,7 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { if err != nil { return nil, err } + // filter out tickers that we are not subscribed to tickers := []AstroportTickerPairs{} for tickerID, v := range tickerMap { for _, ticker := range astroportTickers { @@ -225,47 +284,6 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { return tickers, nil } -// GetAvailablePairs return all available pairs symbol to subscribe. -func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { - availablePairs, _, err := p.getTickerMaps() - if err != nil { - return nil, err - } - - availableSymbols := map[string]struct{}{} - for _, pair := range availablePairs { - availableSymbols[pair.String()] = struct{}{} - } - - return availableSymbols, nil -} - -// SubscribeCurrencyPairs sends the new subscription messages to the websocket -// and adds them to the providers subscribedPairs array -func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { - p.mtx.Lock() - defer p.mtx.Unlock() - - newPairs := []types.CurrencyPair{} - for _, cp := range cps { - if _, ok := p.subscribedPairs[cp.String()]; !ok { - newPairs = append(newPairs, cp) - } - } - - confirmedPairs, err := ConfirmPairAvailability( - p, - p.endpoints.Name, - p.logger, - newPairs..., - ) - if err != nil { - return - } - - p.setSubscribedPairs(confirmedPairs...) -} - // This function periodically calls setTickers to update the priceStore. func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error { for { @@ -285,6 +303,3 @@ func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.Currenc } } } - -// StartConnections starts the websocket connections. -func (p AstroportProvider) StartConnections() {}