Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
adamewozniak committed Nov 22, 2023
1 parent 5ee1aab commit fc441d4
Showing 1 changed file with 81 additions and 66 deletions.
147 changes: 81 additions & 66 deletions oracle/provider/astroport.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
priceStore
}

// AstroportAssetResponse is the response from the Astroport assets endpoint.
AstroportAssetResponse struct {
BaseID string `json:"base_id"`
BaseName string `json:"base_name"`
Expand All @@ -47,7 +48,7 @@ type (
QuoteVolume float64 `json:"quote_volume"`
USDVolume float64 `json:"USD_volume"`
}

// AstroportTickersResponse is the response from the Astroport tickers endpoint.
AstroportTickersResponse struct {
TickerID string `json:"ticker_id"`
BaseCurrency string `json:"base_currency"`
Expand All @@ -58,28 +59,10 @@ type (
TargetVolume float64 `json:"target_volume"`
PoolID string `json:"pool_id"`
}

AstroportTickerPairs struct {
ticker AstroportTickersResponse
pair types.CurrencyPair
}
)

func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) {
lp, err := decmath.NewDecFromFloat(atr.LastPrice)
if err != nil {
return types.TickerPrice{}, err
}
volume, err := decmath.NewDecFromFloat(atr.BaseVolume)
if err != nil {
return types.TickerPrice{}, err
}
return types.TickerPrice{
Price: lp,
Volume: volume,
}, nil
}

// NewAstroportProvider returns a new AstroportProvider.
// It also starts a go routine to poll for new data.
func NewAstroportProvider(
ctx context.Context,
logger zerolog.Logger,
Expand Down Expand Up @@ -125,7 +108,77 @@ func NewAstroportProvider(
return provider, nil
}

// GetTickerPrices returns the tickerPrices based on the provided pairs.
// GetAvailablePairs return all available pair symbols.
func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) {
availablePairs, _, err := p.getTickerMaps()
if err != nil {
return nil, err
}

availableSymbols := map[string]struct{}{}
for _, pair := range availablePairs {
availableSymbols[pair.String()] = struct{}{}
}

return availableSymbols, nil
}

// SubscribeCurrencyPairs sends the new subscription messages to the websocket
// and adds them to the providers subscribedPairs array.
func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

newPairs := []types.CurrencyPair{}
for _, cp := range cps {
if _, ok := p.subscribedPairs[cp.String()]; !ok {
newPairs = append(newPairs, cp)
}
}

confirmedPairs, err := ConfirmPairAvailability(
p,
p.endpoints.Name,
p.logger,
newPairs...,
)
if err != nil {
return
}

p.setSubscribedPairs(confirmedPairs...)
}

// StartConnections starts the websocket connections.
// This function is a no-op for the astroport provider.
func (p AstroportProvider) StartConnections() {}

// AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the
// corresponding pair. It satisfies the TickerPrice interface.
type AstroportTickerPairs struct {
ticker AstroportTickersResponse
pair types.CurrencyPair
}

// toTickerPrice converts the AstroportTickerPairs to a TickerPrice.
// It satisfies the TickerPrice interface.
func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) {
lp, err := decmath.NewDecFromFloat(atr.LastPrice)
if err != nil {
return types.TickerPrice{}, err
}
volume, err := decmath.NewDecFromFloat(atr.BaseVolume)
if err != nil {
return types.TickerPrice{}, err
}
return types.TickerPrice{
Price: lp,
Volume: volume,
}, nil
}

// setTickers queries the Astroport API for the latest tickers and updates the
// priceStore.
func (p AstroportProvider) setTickers() error {
tickers, err := p.queryTickers()
if err != nil {
Expand All @@ -137,6 +190,8 @@ func (p AstroportProvider) setTickers() error {
return nil
}

// findTickersForPairs returns a map of ticker IDs -> pairs, but filters out
// pairs that we are not subscribed to.
func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) {
queryingPairs := p.subscribedPairs
_, pairToTickerIDMap, err := p.getTickerMaps()
Expand All @@ -154,6 +209,8 @@ func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair,
return tickerIDs, nil
}

// getTickerMaps returns all available assets from the api.
// It returns a map of ticker IDs -> pairs and a map of pairs -> ticker IDs.
func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) {
res, err := p.client.Get(p.endpoints.Rest + assetsURL)
if err != nil {
Expand Down Expand Up @@ -189,6 +246,7 @@ func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[s
return availablePairs, pairToTickerID, nil
}

// queryTickers returns the AstroportTickerPairs available from the API.
func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) {
res, err := p.client.Get(p.endpoints.Rest + tickersURL)
if err != nil {
Expand All @@ -210,6 +268,7 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) {
if err != nil {
return nil, err
}
// filter out tickers that we are not subscribed to
tickers := []AstroportTickerPairs{}
for tickerID, v := range tickerMap {
for _, ticker := range astroportTickers {
Expand All @@ -225,47 +284,6 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) {
return tickers, nil
}

// GetAvailablePairs return all available pairs symbol to subscribe.
func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) {
availablePairs, _, err := p.getTickerMaps()
if err != nil {
return nil, err
}

availableSymbols := map[string]struct{}{}
for _, pair := range availablePairs {
availableSymbols[pair.String()] = struct{}{}
}

return availableSymbols, nil
}

// SubscribeCurrencyPairs sends the new subscription messages to the websocket
// and adds them to the providers subscribedPairs array
func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

newPairs := []types.CurrencyPair{}
for _, cp := range cps {
if _, ok := p.subscribedPairs[cp.String()]; !ok {
newPairs = append(newPairs, cp)
}
}

confirmedPairs, err := ConfirmPairAvailability(
p,
p.endpoints.Name,
p.logger,
newPairs...,
)
if err != nil {
return
}

p.setSubscribedPairs(confirmedPairs...)
}

// This function periodically calls setTickers to update the priceStore.
func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error {

Check failure on line 288 in oracle/provider/astroport.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

parameter 'pairs' seems to be unused, consider removing or renaming it as _
for {
Expand All @@ -285,6 +303,3 @@ func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.Currenc
}
}
}

// StartConnections starts the websocket connections.
func (p AstroportProvider) StartConnections() {}

0 comments on commit fc441d4

Please sign in to comment.