Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Separate eth providers into individual providers per exchange #406

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions config/supported_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,26 @@ var (
// SupportedProviders defines a lookup table of all the supported currency API
// providers and whether or not they require an API key to be passed in.
SupportedProviders = map[types.ProviderName]APIKeyRequired{
provider.ProviderKraken: false,
provider.ProviderBinance: false,
provider.ProviderBinanceUS: false,
provider.ProviderOsmosis: false,
provider.ProviderOkx: false,
provider.ProviderHuobi: false,
provider.ProviderGate: false,
provider.ProviderCoinbase: false,
provider.ProviderBitget: false,
provider.ProviderMexc: false,
provider.ProviderCrypto: false,
provider.ProviderPolygon: true,
provider.ProviderEthUniswap: false,
provider.ProviderKujira: false,
provider.ProviderAstroport: false,
provider.ProviderMock: false,
provider.ProviderKraken: false,
provider.ProviderBinance: false,
provider.ProviderBinanceUS: false,
provider.ProviderOsmosis: false,
provider.ProviderOkx: false,
provider.ProviderHuobi: false,
provider.ProviderGate: false,
provider.ProviderCoinbase: false,
provider.ProviderBitget: false,
provider.ProviderMexc: false,
provider.ProviderCrypto: false,
provider.ProviderPolygon: true,
provider.ProviderEthUniswap: false,
provider.ProviderEthCamelot: false,
provider.ProviderEthBalancer: false,
provider.ProviderEthPancake: false,
provider.ProviderEthCurve: false,
provider.ProviderKujira: false,
provider.ProviderAstroport: false,
provider.ProviderMock: false,
}

// SupportedConversions defines a lookup table for which currency pairs we
Expand Down
12 changes: 12 additions & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,18 @@ func NewProvider(
case provider.ProviderEthUniswap:
return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...)

case provider.ProviderEthCamelot:
return provider.NewCamelotProvider(ctx, logger, endpoint, providerPairs...)
rbajollari marked this conversation as resolved.
Show resolved Hide resolved

case provider.ProviderEthBalancer:
return provider.NewBalancerProvider(ctx, logger, endpoint, providerPairs...)

case provider.ProviderEthPancake:
return provider.NewPancakeProvider(ctx, logger, endpoint, providerPairs...)

case provider.ProviderEthCurve:
return provider.NewCurveProvider(ctx, logger, endpoint, providerPairs...)

case provider.ProviderAstroport:
return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...)
}
Expand Down
293 changes: 293 additions & 0 deletions oracle/provider/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package provider

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"sync"

"cosmossdk.io/math"
"github.com/gorilla/websocket"
"github.com/ojo-network/price-feeder/oracle/types"
"github.com/rs/zerolog"
)

const (
balancerWSHost = "api.eth-api.prod.ojo.network"
balancerWSPath = "/balancer/ws"
balancerWSScheme = "wss"
balancerRestHost = "https://api.eth-api.prod.ojo.network"
balancerRestPath = "/balancer/assetpairs"
balancerAckMsg = "ack"
)

var _ Provider = (*BalancerProvider)(nil)

type (
// BalancerProvider defines an Oracle provider implemented by OJO's
// Balancer API.
//
// REF: https://github.com/ojo-network/ehereum-api
BalancerProvider struct {
wsc *WebsocketController
wsURL url.URL
logger zerolog.Logger
mtx sync.RWMutex
endpoints Endpoint

priceStore
}

BalancerTicker struct {
Price string `json:"Price"`
Volume string `json:"Volume"`
}

BalancerCandle struct {
Close string `json:"Close"`
Volume string `json:"Volume"`
EndTime int64 `json:"EndTime"`
}

// BalancerPairsSummary defines the response structure for an Balancer pairs
// summary.
BalancerPairsSummary struct {
Data []BalancerPairData `json:"data"`
}

// BalancerPairData defines the data response structure for an Balancer pair.
BalancerPairData struct {
Base string `json:"base"`
Quote string `json:"quote"`
}
)

func NewBalancerProvider(
ctx context.Context,
logger zerolog.Logger,
endpoints Endpoint,
pairs ...types.CurrencyPair,
) (*BalancerProvider, error) {
if endpoints.Name != ProviderEthBalancer {
endpoints = Endpoint{
Name: ProviderEthBalancer,
Rest: balancerRestHost,
Websocket: balancerWSHost,
}
}

wsURL := url.URL{
Scheme: balancerWSScheme,
Host: endpoints.Websocket,
Path: balancerWSPath,
}

balancerLogger := logger.With().Str("provider", "balancer").Logger()

provider := &BalancerProvider{
wsURL: wsURL,
logger: balancerLogger,
endpoints: endpoints,
priceStore: newPriceStore(balancerLogger),
}
provider.setCurrencyPairToTickerAndCandlePair(currencyPairToBalancerPair)

confirmedPairs, err := ConfirmPairAvailability(
provider,
provider.endpoints.Name,
provider.logger,
pairs...,
)
if err != nil {
return nil, err
}

provider.setSubscribedPairs(confirmedPairs...)

provider.wsc = NewWebsocketController(
ctx,
endpoints.Name,
wsURL,
[]interface{}{""},
provider.messageReceived,
defaultPingDuration,
websocket.PingMessage,
balancerLogger,
)

return provider, nil
}

func (p *BalancerProvider) StartConnections() {
p.wsc.StartConnections()
}

// SubscribeCurrencyPairs sends the new subscription messages to the websocket
// and adds them to the providers subscribedPairs array
func (p *BalancerProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

confirmedPairs, err := ConfirmPairAvailability(
p,
p.endpoints.Name,
p.logger,
cps...,
)
if err != nil {
return
}

p.setSubscribedPairs(confirmedPairs...)
}

func (p *BalancerProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) {
// check if message is an ack
if string(bz) == balancerAckMsg {
return
}

var (
messageResp map[string]interface{}
messageErr error
tickerResp BalancerTicker
tickerErr error
candleResp []BalancerCandle
candleErr error
)

messageErr = json.Unmarshal(bz, &messageResp)
if messageErr != nil {
p.logger.Error().
Int("length", len(bz)).
AnErr("message", messageErr).
Msg("Error on receive message")
}

// Check the response for currency pairs that the provider is subscribed
// to and determine whether it is a ticker or candle.
for _, pair := range p.subscribedPairs {
balancerPair := currencyPairToBalancerPair(pair)
if msg, ok := messageResp[balancerPair]; ok {
switch v := msg.(type) {
// ticker response
case map[string]interface{}:
tickerString, _ := json.Marshal(v)
tickerErr = json.Unmarshal(tickerString, &tickerResp)
if tickerErr != nil {
p.logger.Error().
Int("length", len(bz)).
AnErr("ticker", tickerErr).
Msg("Error on receive message")
continue
}
p.setTickerPair(
tickerResp,
balancerPair,
)
telemetryWebsocketMessage(ProviderEthBalancer, MessageTypeTicker)
continue

// candle response
case []interface{}:
// use latest candlestick in list if there is one
if len(v) == 0 {
continue
}
candleString, _ := json.Marshal(v)
candleErr = json.Unmarshal(candleString, &candleResp)
if candleErr != nil {
p.logger.Error().
Int("length", len(bz)).
AnErr("candle", candleErr).
Msg("Error on receive message")
continue
}
for _, singleCandle := range candleResp {
p.setCandlePair(
singleCandle,
balancerPair,
)
}
telemetryWebsocketMessage(ProviderEthBalancer, MessageTypeCandle)
continue
}
}
}
}

func (o BalancerTicker) toTickerPrice() (types.TickerPrice, error) {
price, err := math.LegacyNewDecFromStr(o.Price)
if err != nil {
return types.TickerPrice{}, fmt.Errorf("balancer: failed to parse ticker price: %w", err)
}
volume, err := math.LegacyNewDecFromStr(o.Volume)
if err != nil {
return types.TickerPrice{}, fmt.Errorf("balancer: failed to parse ticker volume: %w", err)
}

tickerPrice := types.TickerPrice{
Price: price,
Volume: volume,
}
return tickerPrice, nil
}

func (o BalancerCandle) toCandlePrice() (types.CandlePrice, error) {
close, err := math.LegacyNewDecFromStr(o.Close)
if err != nil {
return types.CandlePrice{}, fmt.Errorf("balancer: failed to parse candle price: %w", err)
}
volume, err := math.LegacyNewDecFromStr(o.Volume)
if err != nil {
return types.CandlePrice{}, fmt.Errorf("balancer: failed to parse candle volume: %w", err)
}
candlePrice := types.CandlePrice{
Price: close,
Volume: volume,
TimeStamp: o.EndTime,
}
return candlePrice, nil
}

// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
func (p *BalancerProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
for _, cp := range cps {
p.subscribedPairs[cp.String()] = cp
}
}

// GetAvailablePairs returns all pairs to which the provider can subscribe.
// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}].
func (p *BalancerProvider) GetAvailablePairs() (map[string]struct{}, error) {
resp, err := http.Get(p.endpoints.Rest + balancerRestPath)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var pairsSummary []BalancerPairData
if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil {
return nil, err
}

availablePairs := make(map[string]struct{}, len(pairsSummary))
for _, pair := range pairsSummary {
cp := types.CurrencyPair{
Base: pair.Base,
Quote: pair.Quote,
}
availablePairs[strings.ToUpper(cp.String())] = struct{}{}
}

return availablePairs, nil
}

// currencyPairToBalancerPair receives a currency pair and return balancer
// ticker symbol atomusdt@ticker.
func currencyPairToBalancerPair(cp types.CurrencyPair) string {
return cp.Base + "/" + cp.Quote
}
Loading
Loading