From 3db39799f46224ed8b124337043bc28d9d4f116f Mon Sep 17 00:00:00 2001 From: ryanbajollari <54822716+rbajollari@users.noreply.github.com> Date: Thu, 22 Aug 2024 18:11:11 +0200 Subject: [PATCH] feat: Separate eth providers into individual providers per exchange (#406) --- config/supported_assets.go | 36 ++-- oracle/oracle.go | 12 ++ oracle/provider/balancer.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/balancer_test.go | 116 ++++++++++++ oracle/provider/camelot.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/camelot_test.go | 116 ++++++++++++ oracle/provider/curve.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/curve_test.go | 116 ++++++++++++ oracle/provider/pancake.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/pancake_test.go | 116 ++++++++++++ oracle/provider/provider.go | 34 ++-- oracle/provider/uniswap.go | 4 +- 12 files changed, 1689 insertions(+), 33 deletions(-) create mode 100644 oracle/provider/balancer.go create mode 100644 oracle/provider/balancer_test.go create mode 100644 oracle/provider/camelot.go create mode 100644 oracle/provider/camelot_test.go create mode 100644 oracle/provider/curve.go create mode 100644 oracle/provider/curve_test.go create mode 100644 oracle/provider/pancake.go create mode 100644 oracle/provider/pancake_test.go diff --git a/config/supported_assets.go b/config/supported_assets.go index 42532ef6..0aa7b4fd 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -11,22 +11,26 @@ var ( // SupportedProviders defines a lookup table of all the supported currency API // providers and whether or not they require an API key to be passed in. SupportedProviders = map[types.ProviderName]APIKeyRequired{ - provider.ProviderKraken: false, - provider.ProviderBinance: false, - provider.ProviderBinanceUS: false, - provider.ProviderOsmosis: false, - provider.ProviderOkx: false, - provider.ProviderHuobi: false, - provider.ProviderGate: false, - provider.ProviderCoinbase: false, - provider.ProviderBitget: false, - provider.ProviderMexc: false, - provider.ProviderCrypto: false, - provider.ProviderPolygon: true, - provider.ProviderEthUniswap: false, - provider.ProviderKujira: false, - provider.ProviderAstroport: false, - provider.ProviderMock: false, + provider.ProviderKraken: false, + provider.ProviderBinance: false, + provider.ProviderBinanceUS: false, + provider.ProviderOsmosis: false, + provider.ProviderOkx: false, + provider.ProviderHuobi: false, + provider.ProviderGate: false, + provider.ProviderCoinbase: false, + provider.ProviderBitget: false, + provider.ProviderMexc: false, + provider.ProviderCrypto: false, + provider.ProviderPolygon: true, + provider.ProviderEthUniswap: false, + provider.ProviderEthCamelot: false, + provider.ProviderEthBalancer: false, + provider.ProviderEthPancake: false, + provider.ProviderEthCurve: false, + provider.ProviderKujira: false, + provider.ProviderAstroport: false, + provider.ProviderMock: false, } // SupportedConversions defines a lookup table for which currency pairs we diff --git a/oracle/oracle.go b/oracle/oracle.go index 24646a9a..bb8e7fff 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -507,6 +507,18 @@ func NewProvider( case provider.ProviderEthUniswap: return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...) + case provider.ProviderEthCamelot: + return provider.NewCamelotProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderEthBalancer: + return provider.NewBalancerProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderEthPancake: + return provider.NewPancakeProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderEthCurve: + return provider.NewCurveProvider(ctx, logger, endpoint, providerPairs...) + case provider.ProviderAstroport: return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...) } diff --git a/oracle/provider/balancer.go b/oracle/provider/balancer.go new file mode 100644 index 00000000..139f5f95 --- /dev/null +++ b/oracle/provider/balancer.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + balancerWSHost = "api.eth-api.prod.ojo.network" + balancerWSPath = "/balancer/ws" + balancerWSScheme = "wss" + balancerRestHost = "https://api.eth-api.prod.ojo.network" + balancerRestPath = "/balancer/assetpairs" + balancerAckMsg = "ack" +) + +var _ Provider = (*BalancerProvider)(nil) + +type ( + // BalancerProvider defines an Oracle provider implemented by OJO's + // Balancer API. + // + // REF: https://github.com/ojo-network/ehereum-api + BalancerProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + BalancerTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + BalancerCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // BalancerPairsSummary defines the response structure for an Balancer pairs + // summary. + BalancerPairsSummary struct { + Data []BalancerPairData `json:"data"` + } + + // BalancerPairData defines the data response structure for an Balancer pair. + BalancerPairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewBalancerProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*BalancerProvider, error) { + if endpoints.Name != ProviderEthBalancer { + endpoints = Endpoint{ + Name: ProviderEthBalancer, + Rest: balancerRestHost, + Websocket: balancerWSHost, + } + } + + wsURL := url.URL{ + Scheme: balancerWSScheme, + Host: endpoints.Websocket, + Path: balancerWSPath, + } + + balancerLogger := logger.With().Str("provider", "balancer").Logger() + + provider := &BalancerProvider{ + wsURL: wsURL, + logger: balancerLogger, + endpoints: endpoints, + priceStore: newPriceStore(balancerLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToBalancerPair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + balancerLogger, + ) + + return provider, nil +} + +func (p *BalancerProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *BalancerProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *BalancerProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == balancerAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp BalancerTicker + tickerErr error + candleResp []BalancerCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + balancerPair := currencyPairToBalancerPair(pair) + if msg, ok := messageResp[balancerPair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + balancerPair, + ) + telemetryWebsocketMessage(ProviderEthBalancer, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + balancerPair, + ) + } + telemetryWebsocketMessage(ProviderEthBalancer, MessageTypeCandle) + continue + } + } + } +} + +func (o BalancerTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("balancer: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("balancer: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o BalancerCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("balancer: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("balancer: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *BalancerProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *BalancerProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + balancerRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []BalancerPairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToBalancerPair receives a currency pair and return balancer +// ticker symbol atomusdt@ticker. +func currencyPairToBalancerPair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/balancer_test.go b/oracle/provider/balancer_test.go new file mode 100644 index 00000000..5266653e --- /dev/null +++ b/oracle/provider/balancer_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestBalancerProvider_GetTickerPrices(t *testing.T) { + p, err := NewBalancerProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestBalancerProvider_GetCandlePrices(t *testing.T) { + p, err := NewBalancerProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := BalancerCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestBalancerCurrencyPairToBalancerPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + balancerSymbol := currencyPairToBalancerPair(cp) + require.Equal(t, balancerSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/camelot.go b/oracle/provider/camelot.go new file mode 100644 index 00000000..eb0b669a --- /dev/null +++ b/oracle/provider/camelot.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + camelotWSHost = "api.eth-api.prod.ojo.network" + camelotWSPath = "/camelot/ws" + camelotWSScheme = "wss" + camelotRestHost = "https://api.eth-api.prod.ojo.network" + camelotRestPath = "/camelot/assetpairs" + camelotAckMsg = "ack" +) + +var _ Provider = (*CamelotProvider)(nil) + +type ( + // CamelotProvider defines an Oracle provider implemented by OJO's + // Camelot API. + // + // REF: https://github.com/ojo-network/ehereum-api + CamelotProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + CamelotTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + CamelotCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // CamelotPairsSummary defines the response structure for an Camelot pairs + // summary. + CamelotPairsSummary struct { + Data []CamelotPairData `json:"data"` + } + + // CamelotPairData defines the data response structure for an Camelot pair. + CamelotPairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewCamelotProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*CamelotProvider, error) { + if endpoints.Name != ProviderEthCamelot { + endpoints = Endpoint{ + Name: ProviderEthCamelot, + Rest: camelotRestHost, + Websocket: camelotWSHost, + } + } + + wsURL := url.URL{ + Scheme: camelotWSScheme, + Host: endpoints.Websocket, + Path: camelotWSPath, + } + + camelotLogger := logger.With().Str("provider", "camelot").Logger() + + provider := &CamelotProvider{ + wsURL: wsURL, + logger: camelotLogger, + endpoints: endpoints, + priceStore: newPriceStore(camelotLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToCamelotPair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + camelotLogger, + ) + + return provider, nil +} + +func (p *CamelotProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *CamelotProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *CamelotProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == camelotAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp CamelotTicker + tickerErr error + candleResp []CamelotCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + camelotPair := currencyPairToCamelotPair(pair) + if msg, ok := messageResp[camelotPair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + camelotPair, + ) + telemetryWebsocketMessage(ProviderEthCamelot, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + camelotPair, + ) + } + telemetryWebsocketMessage(ProviderEthCamelot, MessageTypeCandle) + continue + } + } + } +} + +func (o CamelotTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("camelot: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("camelot: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o CamelotCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("camelot: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("camelot: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *CamelotProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *CamelotProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + camelotRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []CamelotPairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToCamelotPair receives a currency pair and return camelot +// ticker symbol atomusdt@ticker. +func currencyPairToCamelotPair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/camelot_test.go b/oracle/provider/camelot_test.go new file mode 100644 index 00000000..bd460093 --- /dev/null +++ b/oracle/provider/camelot_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestCamelotProvider_GetTickerPrices(t *testing.T) { + p, err := NewCamelotProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCamelotProvider_GetCandlePrices(t *testing.T) { + p, err := NewCamelotProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := CamelotCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCamelotCurrencyPairToCamelotPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + camelotSymbol := currencyPairToCamelotPair(cp) + require.Equal(t, camelotSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/curve.go b/oracle/provider/curve.go new file mode 100644 index 00000000..76086143 --- /dev/null +++ b/oracle/provider/curve.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + curveWSHost = "api.eth-api.prod.ojo.network" + curveWSPath = "/curve/ws" + curveWSScheme = "wss" + curveRestHost = "https://api.eth-api.prod.ojo.network" + curveRestPath = "/curve/assetpairs" + curveAckMsg = "ack" +) + +var _ Provider = (*CurveProvider)(nil) + +type ( + // CurveProvider defines an Oracle provider implemented by OJO's + // Curve API. + // + // REF: https://github.com/ojo-network/ehereum-api + CurveProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + CurveTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + CurveCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // CurvePairsSummary defines the response structure for an Curve pairs + // summary. + CurvePairsSummary struct { + Data []CurvePairData `json:"data"` + } + + // CurvePairData defines the data response structure for an Curve pair. + CurvePairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewCurveProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*CurveProvider, error) { + if endpoints.Name != ProviderEthCurve { + endpoints = Endpoint{ + Name: ProviderEthCurve, + Rest: curveRestHost, + Websocket: curveWSHost, + } + } + + wsURL := url.URL{ + Scheme: curveWSScheme, + Host: endpoints.Websocket, + Path: curveWSPath, + } + + curveLogger := logger.With().Str("provider", "curve").Logger() + + provider := &CurveProvider{ + wsURL: wsURL, + logger: curveLogger, + endpoints: endpoints, + priceStore: newPriceStore(curveLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToCurvePair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + curveLogger, + ) + + return provider, nil +} + +func (p *CurveProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *CurveProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *CurveProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == curveAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp CurveTicker + tickerErr error + candleResp []CurveCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + curvePair := currencyPairToCurvePair(pair) + if msg, ok := messageResp[curvePair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + curvePair, + ) + telemetryWebsocketMessage(ProviderEthCurve, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + curvePair, + ) + } + telemetryWebsocketMessage(ProviderEthCurve, MessageTypeCandle) + continue + } + } + } +} + +func (o CurveTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("curve: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("curve: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o CurveCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("curve: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("curve: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *CurveProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *CurveProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + curveRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []CurvePairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToCurvePair receives a currency pair and return curve +// ticker symbol atomusdt@ticker. +func currencyPairToCurvePair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/curve_test.go b/oracle/provider/curve_test.go new file mode 100644 index 00000000..4c9a8998 --- /dev/null +++ b/oracle/provider/curve_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestCurveProvider_GetTickerPrices(t *testing.T) { + p, err := NewCurveProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCurveProvider_GetCandlePrices(t *testing.T) { + p, err := NewCurveProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := CurveCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCurveCurrencyPairToCurvePair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + curveSymbol := currencyPairToCurvePair(cp) + require.Equal(t, curveSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/pancake.go b/oracle/provider/pancake.go new file mode 100644 index 00000000..999da952 --- /dev/null +++ b/oracle/provider/pancake.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + pancakeWSHost = "api.eth-api.prod.ojo.network" + pancakeWSPath = "/pancake/ws" + pancakeWSScheme = "wss" + pancakeRestHost = "https://api.eth-api.prod.ojo.network" + pancakeRestPath = "/pancake/assetpairs" + pancakeAckMsg = "ack" +) + +var _ Provider = (*PancakeProvider)(nil) + +type ( + // PancakeProvider defines an Oracle provider implemented by OJO's + // Pancake API. + // + // REF: https://github.com/ojo-network/ehereum-api + PancakeProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + PancakeTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + PancakeCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // PancakePairsSummary defines the response structure for an Pancake pairs + // summary. + PancakePairsSummary struct { + Data []PancakePairData `json:"data"` + } + + // PancakePairData defines the data response structure for an Pancake pair. + PancakePairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewPancakeProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*PancakeProvider, error) { + if endpoints.Name != ProviderEthPancake { + endpoints = Endpoint{ + Name: ProviderEthPancake, + Rest: pancakeRestHost, + Websocket: pancakeWSHost, + } + } + + wsURL := url.URL{ + Scheme: pancakeWSScheme, + Host: endpoints.Websocket, + Path: pancakeWSPath, + } + + pancakeLogger := logger.With().Str("provider", "pancake").Logger() + + provider := &PancakeProvider{ + wsURL: wsURL, + logger: pancakeLogger, + endpoints: endpoints, + priceStore: newPriceStore(pancakeLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToPancakePair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + pancakeLogger, + ) + + return provider, nil +} + +func (p *PancakeProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *PancakeProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *PancakeProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == pancakeAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp PancakeTicker + tickerErr error + candleResp []PancakeCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + pancakePair := currencyPairToPancakePair(pair) + if msg, ok := messageResp[pancakePair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + pancakePair, + ) + telemetryWebsocketMessage(ProviderEthPancake, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + pancakePair, + ) + } + telemetryWebsocketMessage(ProviderEthPancake, MessageTypeCandle) + continue + } + } + } +} + +func (o PancakeTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("pancake: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("pancake: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o PancakeCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("pancake: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("pancake: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *PancakeProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *PancakeProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + pancakeRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []PancakePairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToPancakePair receives a currency pair and return pancake +// ticker symbol atomusdt@ticker. +func currencyPairToPancakePair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/pancake_test.go b/oracle/provider/pancake_test.go new file mode 100644 index 00000000..a1a38c33 --- /dev/null +++ b/oracle/provider/pancake_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestPancakeProvider_GetTickerPrices(t *testing.T) { + p, err := NewPancakeProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestPancakeProvider_GetCandlePrices(t *testing.T) { + p, err := NewPancakeProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := PancakeCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestPancakeCurrencyPairToPancakePair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + pancakeSymbol := currencyPairToPancakePair(cp) + require.Equal(t, pancakeSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/provider.go b/oracle/provider/provider.go index 5e954c3c..e5611dd9 100644 --- a/oracle/provider/provider.go +++ b/oracle/provider/provider.go @@ -9,21 +9,25 @@ import ( const ( defaultTimeout = 10 * time.Second - ProviderKraken types.ProviderName = "kraken" - ProviderBinance types.ProviderName = "binance" - ProviderBinanceUS types.ProviderName = "binanceus" - ProviderOsmosis types.ProviderName = "osmosis" - ProviderHuobi types.ProviderName = "huobi" - ProviderOkx types.ProviderName = "okx" - ProviderGate types.ProviderName = "gate" - ProviderCoinbase types.ProviderName = "coinbase" - ProviderBitget types.ProviderName = "bitget" - ProviderMexc types.ProviderName = "mexc" - ProviderCrypto types.ProviderName = "crypto" - ProviderPolygon types.ProviderName = "polygon" - ProviderEthUniswap types.ProviderName = "eth-uniswap" - ProviderKujira types.ProviderName = "kujira" - ProviderMock types.ProviderName = "mock" + ProviderKraken types.ProviderName = "kraken" + ProviderBinance types.ProviderName = "binance" + ProviderBinanceUS types.ProviderName = "binanceus" + ProviderOsmosis types.ProviderName = "osmosis" + ProviderHuobi types.ProviderName = "huobi" + ProviderOkx types.ProviderName = "okx" + ProviderGate types.ProviderName = "gate" + ProviderCoinbase types.ProviderName = "coinbase" + ProviderBitget types.ProviderName = "bitget" + ProviderMexc types.ProviderName = "mexc" + ProviderCrypto types.ProviderName = "crypto" + ProviderPolygon types.ProviderName = "polygon" + ProviderEthUniswap types.ProviderName = "eth-uniswap" + ProviderEthCamelot types.ProviderName = "eth-camelot" + ProviderEthBalancer types.ProviderName = "eth-balancer" + ProviderEthPancake types.ProviderName = "eth-pancake" + ProviderEthCurve types.ProviderName = "eth-curve" + ProviderKujira types.ProviderName = "kujira" + ProviderMock types.ProviderName = "mock" ) var ( diff --git a/oracle/provider/uniswap.go b/oracle/provider/uniswap.go index 586305fd..7f268665 100644 --- a/oracle/provider/uniswap.go +++ b/oracle/provider/uniswap.go @@ -17,10 +17,10 @@ import ( const ( uniswapWSHost = "api.eth-api.prod.ojo.network" - uniswapWSPath = "ws" + uniswapWSPath = "/uniswap/ws" uniswapWSScheme = "wss" uniswapRestHost = "https://api.eth-api.prod.ojo.network" - uniswapRestPath = "/assetpairs" + uniswapRestPath = "/uniswap/assetpairs" uniswapAckMsg = "ack" )