From 2216d80966eea3b743dd8fcf469e6b73d2d37cfd Mon Sep 17 00:00:00 2001 From: ryanbajollari <54822716+rbajollari@users.noreply.github.com> Date: Fri, 16 Aug 2024 00:31:34 +0200 Subject: [PATCH 1/3] chore: Remove crescent provider (#402) --- config/currency_provider_tracker.go | 60 +------ config/supported_assets.go | 1 - oracle/oracle.go | 3 - oracle/provider/crescent.go | 269 ---------------------------- oracle/provider/crescent_test.go | 116 ------------ oracle/provider/provider.go | 1 - 6 files changed, 9 insertions(+), 441 deletions(-) delete mode 100644 oracle/provider/crescent.go delete mode 100644 oracle/provider/crescent_test.go diff --git a/config/currency_provider_tracker.go b/config/currency_provider_tracker.go index 221f4ba7..1a797162 100644 --- a/config/currency_provider_tracker.go +++ b/config/currency_provider_tracker.go @@ -14,15 +14,13 @@ import ( ) const ( - coinGeckoRestURL = "https://api.coingecko.com/api/v3/coins" - coinGeckoListEndpoint = "list" - coinGeckoTickersEndpoint = "tickers" - osmosisRestURL = "https://api.osmo-api.prod.ojo.network" - osmosisAssetPairsEndpoint = "assetpairs" - crescentRestURL = "https://api.cresc-api.prod.ojo.network" - crescentAssetPairsEndpoint = "assetpairs" - requestTimeout = time.Second * 2 - trackingPeriod = time.Hour * 24 + coinGeckoRestURL = "https://api.coingecko.com/api/v3/coins" + coinGeckoListEndpoint = "list" + coinGeckoTickersEndpoint = "tickers" + osmosisRestURL = "https://api.osmo-api.prod.ojo.network" + osmosisAssetPairsEndpoint = "assetpairs" + requestTimeout = time.Second * 2 + trackingPeriod = time.Hour * 24 ) type ( @@ -90,12 +88,7 @@ func NewCurrencyProviderTracker( return nil, err } - crescentAPIPairs, err := currencyProviderTracker.getCrescentAPIPairs() - if err != nil { - return nil, err - } - - if err := currencyProviderTracker.setCurrencyProviders(osmosisAPIPairs, crescentAPIPairs); err != nil { + if err := currencyProviderTracker.setCurrencyProviders(osmosisAPIPairs); err != nil { return nil, err } @@ -159,36 +152,10 @@ func (t *CurrencyProviderTracker) getOsmosisAPIPairs() (map[string]string, error return osmosisAPIPairs, nil } -// getCrescentAPIPairs queries the crescent-api assetpairs endpoint to get the asset pairs -// supported by it. -func (t *CurrencyProviderTracker) getCrescentAPIPairs() (map[string]string, error) { - client := &http.Client{ - Timeout: requestTimeout, - } - crescentAPIPairs := make(map[string]string) - - crescentResp, err := client.Get(fmt.Sprintf("%s/%s", crescentRestURL, crescentAssetPairsEndpoint)) - if err != nil { - return nil, err - } - defer crescentResp.Body.Close() - var assetPairsResponse []assetPair - if err = json.NewDecoder(crescentResp.Body).Decode(&assetPairsResponse); err != nil { - return nil, err - } - - for _, assetPair := range assetPairsResponse { - crescentAPIPairs[assetPair.Base] = assetPair.Quote - } - - return crescentAPIPairs, nil -} - // setCurrencyProviders queries CoinGecko's tickers endpoint to get all the exchanges // that support each price feeder currency pair and store it in the CurrencyProviders map. func (t *CurrencyProviderTracker) setCurrencyProviders( osmosisAPIPairs map[string]string, - crescentAPIPairs map[string]string, ) error { client := &http.Client{ Timeout: requestTimeout, @@ -199,11 +166,6 @@ func (t *CurrencyProviderTracker) setCurrencyProviders( t.CurrencyProviders[pair.Base] = append(t.CurrencyProviders[pair.Base], "osmosis") } - // check if its a pair supported by the crescent api - if crescentAPIPairs[strings.ToUpper(pair.Base)] == strings.ToUpper(pair.Quote) { - t.CurrencyProviders[pair.Base] = append(t.CurrencyProviders[pair.Base], "crescent") - } - // check if CoinGecko API supports pair pairBaseID := t.coinIDSymbolMap[strings.ToLower(pair.Base)] coinGeckoResp, err := client.Get(fmt.Sprintf("%s/%s/%s", coinGeckoRestURL, pairBaseID, coinGeckoTickersEndpoint)) @@ -259,11 +221,7 @@ func (t *CurrencyProviderTracker) trackCurrencyProviders(ctx context.Context) { if err != nil { t.logger.Error().Err(err).Msg("failed to query osmosis-api for available asset pairs") } - crescentAPIPairs, err := t.getOsmosisAPIPairs() - if err != nil { - t.logger.Error().Err(err).Msg("failed to query crescent-api for available asset pairs") - } - if err := t.setCurrencyProviders(osmosisAPIPairs, crescentAPIPairs); err != nil { + if err := t.setCurrencyProviders(osmosisAPIPairs); err != nil { t.logger.Error().Err(err).Msg("failed to set available providers for currencies") } diff --git a/config/supported_assets.go b/config/supported_assets.go index 69e1eb41..3cff2ca7 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -14,7 +14,6 @@ var ( provider.ProviderKraken: false, provider.ProviderBinance: false, provider.ProviderBinanceUS: false, - provider.ProviderCrescent: false, provider.ProviderOsmosis: false, provider.ProviderOkx: false, provider.ProviderHuobi: false, diff --git a/oracle/oracle.go b/oracle/oracle.go index 487c81ea..24646a9a 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -498,9 +498,6 @@ func NewProvider( case provider.ProviderPolygon: return provider.NewPolygonProvider(ctx, logger, endpoint, providerPairs...) - case provider.ProviderCrescent: - return provider.NewCrescentProvider(ctx, logger, endpoint, providerPairs...) - case provider.ProviderKujira: return provider.NewKujiraProvider(ctx, logger, endpoint, providerPairs...) diff --git a/oracle/provider/crescent.go b/oracle/provider/crescent.go deleted file mode 100644 index ed70efab..00000000 --- a/oracle/provider/crescent.go +++ /dev/null @@ -1,269 +0,0 @@ -package provider - -import ( - "context" - "encoding/json" - "net/http" - "net/url" - "strings" - "sync" - - "github.com/gorilla/websocket" - "github.com/ojo-network/price-feeder/oracle/types" - "github.com/rs/zerolog" -) - -const ( - crescentV2WSHost = "api.cresc-api.prod.ojo.network" - crescentV2WSPath = "ws" - crescentV2RestHost = "https://api.cresc-api.prod.ojo.network" - crescentV2RestPath = "/assetpairs" - crescentAckMsg = "ack" -) - -var _ Provider = (*CrescentProvider)(nil) - -type ( - // CrescentProvider defines an Oracle provider implemented by OJO's - // Crescent API. - CrescentProvider struct { - wsc *WebsocketController - wsURL url.URL - logger zerolog.Logger - mtx sync.RWMutex - endpoints Endpoint - - priceStore - } - - CrescentTicker struct { - Price string `json:"Price"` - Volume string `json:"Volume"` - } - - CrescentCandle struct { - Close string `json:"Close"` - Volume string `json:"Volume"` - EndTime int64 `json:"EndTime"` - } - - // CrescentPairsSummary defines the response structure for an Crescent pairs - // summary. - CrescentPairsSummary struct { - Data []CrescentPairData `json:"data"` - } - - // CrescentPairData defines the data response structure for an Crescent pair. - CrescentPairData struct { - Base string `json:"base"` - Quote string `json:"quote"` - } -) - -func NewCrescentProvider( - ctx context.Context, - logger zerolog.Logger, - endpoints Endpoint, - pairs ...types.CurrencyPair, -) (*CrescentProvider, error) { - if endpoints.Name != ProviderCrescent { - endpoints = Endpoint{ - Name: ProviderCrescent, - Rest: crescentV2RestHost, - Websocket: crescentV2WSHost, - } - } - - wsURL := url.URL{ - Scheme: "wss", - Host: endpoints.Websocket, - Path: crescentV2WSPath, - } - - crescentV2Logger := logger.With().Str("provider", "crescent").Logger() - - provider := &CrescentProvider{ - wsURL: wsURL, - logger: crescentV2Logger, - endpoints: endpoints, - priceStore: newPriceStore(crescentV2Logger), - } - provider.setCurrencyPairToTickerAndCandlePair(currencyPairToCrescentPair) - - confirmedPairs, err := ConfirmPairAvailability( - provider, - provider.endpoints.Name, - provider.logger, - pairs..., - ) - if err != nil { - return nil, err - } - - provider.setSubscribedPairs(confirmedPairs...) - - provider.wsc = NewWebsocketController( - ctx, - endpoints.Name, - wsURL, - []interface{}{""}, - provider.messageReceived, - defaultPingDuration, - websocket.PingMessage, - crescentV2Logger, - ) - - return provider, nil -} - -func (p *CrescentProvider) StartConnections() { - p.wsc.StartConnections() -} - -// SubscribeCurrencyPairs sends the new subscription messages to the websocket -// and adds them to the providers subscribedPairs array -func (p *CrescentProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { - p.mtx.Lock() - defer p.mtx.Unlock() - - confirmedPairs, err := ConfirmPairAvailability( - p, - p.endpoints.Name, - p.logger, - cps..., - ) - if err != nil { - return - } - - p.setSubscribedPairs(confirmedPairs...) -} - -func (p *CrescentProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { - // check if message is an ack - if string(bz) == crescentAckMsg { - return - } - - var ( - messageResp map[string]interface{} - messageErr error - tickerResp CrescentTicker - tickerErr error - candleResp []CrescentCandle - candleErr error - ) - - messageErr = json.Unmarshal(bz, &messageResp) - if messageErr != nil { - p.logger.Error(). - Int("length", len(bz)). - AnErr("message", messageErr). - Msg("Error on receive message") - } - - // Check the response for currency pairs that the provider is subscribed - // to and determine whether it is a ticker or candle. - for _, pair := range p.subscribedPairs { - crescentPair := currencyPairToCrescentPair(pair) - if msg, ok := messageResp[crescentPair]; ok { - switch v := msg.(type) { - // ticker response - case map[string]interface{}: - tickerString, _ := json.Marshal(v) - tickerErr = json.Unmarshal(tickerString, &tickerResp) - if tickerErr != nil { - p.logger.Error(). - Int("length", len(bz)). - AnErr("ticker", tickerErr). - Msg("Error on receive message") - continue - } - p.setTickerPair( - tickerResp, - crescentPair, - ) - telemetryWebsocketMessage(ProviderCrescent, MessageTypeTicker) - continue - - // candle response - case []interface{}: - // use latest candlestick in list if there is one - if len(v) == 0 { - continue - } - candleString, _ := json.Marshal(v) - candleErr = json.Unmarshal(candleString, &candleResp) - if candleErr != nil { - p.logger.Error(). - Int("length", len(bz)). - AnErr("candle", candleErr). - Msg("Error on receive message") - continue - } - for _, singleCandle := range candleResp { - p.setCandlePair( - singleCandle, - crescentPair, - ) - } - telemetryWebsocketMessage(ProviderCrescent, MessageTypeCandle) - continue - } - } - } -} - -func (ct CrescentTicker) toTickerPrice() (types.TickerPrice, error) { - return types.NewTickerPrice( - ct.Price, - ct.Volume, - ) -} - -func (cc CrescentCandle) toCandlePrice() (types.CandlePrice, error) { - return types.NewCandlePrice( - cc.Close, - cc.Volume, - cc.EndTime, - ) -} - -// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. -func (p *CrescentProvider) setSubscribedPairs(cps ...types.CurrencyPair) { - for _, cp := range cps { - p.subscribedPairs[cp.String()] = cp - } -} - -// GetAvailablePairs returns all pairs to which the provider can subscribe. -// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. -func (p *CrescentProvider) GetAvailablePairs() (map[string]struct{}, error) { - resp, err := http.Get(p.endpoints.Rest + crescentV2RestPath) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var pairsSummary []CrescentPairData - if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { - return nil, err - } - - availablePairs := make(map[string]struct{}, len(pairsSummary)) - for _, pair := range pairsSummary { - cp := types.CurrencyPair{ - Base: pair.Base, - Quote: pair.Quote, - } - availablePairs[strings.ToUpper(cp.String())] = struct{}{} - } - - return availablePairs, nil -} - -// currencyPairToCrescentPair receives a currency pair and return crescent -// ticker symbol atomusdt@ticker. -func currencyPairToCrescentPair(cp types.CurrencyPair) string { - return cp.Base + "/" + cp.Quote -} diff --git a/oracle/provider/crescent_test.go b/oracle/provider/crescent_test.go deleted file mode 100644 index b87468ae..00000000 --- a/oracle/provider/crescent_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package provider - -import ( - "context" - "testing" - - "cosmossdk.io/math" - "github.com/ojo-network/price-feeder/oracle/types" - "github.com/rs/zerolog" - "github.com/stretchr/testify/require" -) - -func TestCrescentProvider_GetTickerPrices(t *testing.T) { - p, err := NewCrescentProvider( - context.TODO(), - zerolog.Nop(), - Endpoint{}, - BCREATOM, - ) - require.NoError(t, err) - - t.Run("valid_request_single_ticker", func(t *testing.T) { - lastPrice := math.LegacyMustNewDecFromStr("34.69000000") - volume := math.LegacyMustNewDecFromStr("2396974.02000000") - - tickerMap := map[string]types.TickerPrice{} - tickerMap["BCRE/ATOM"] = types.TickerPrice{ - Price: lastPrice, - Volume: volume, - } - - p.tickers = tickerMap - - prices, err := p.GetTickerPrices(BCREATOM) - require.NoError(t, err) - require.Len(t, prices, 1) - require.Equal(t, lastPrice, prices[BCREATOM].Price) - require.Equal(t, volume, prices[BCREATOM].Volume) - }) - - t.Run("valid_request_multi_ticker", func(t *testing.T) { - lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") - lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") - volume := math.LegacyMustNewDecFromStr("2396974.02000000") - - tickerMap := map[string]types.TickerPrice{} - tickerMap["ATOM/USDT"] = types.TickerPrice{ - Price: lastPriceAtom, - Volume: volume, - } - - tickerMap["LUNA/USDT"] = types.TickerPrice{ - Price: lastPriceLuna, - Volume: volume, - } - - p.tickers = tickerMap - prices, err := p.GetTickerPrices( - ATOMUSDT, - LUNAUSDT, - ) - require.NoError(t, err) - require.Len(t, prices, 2) - require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) - require.Equal(t, volume, prices[ATOMUSDT].Volume) - require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) - require.Equal(t, volume, prices[LUNAUSDT].Volume) - }) - - t.Run("invalid_request_invalid_ticker", func(t *testing.T) { - prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) - require.Empty(t, prices) - }) -} - -func TestCrescentProvider_GetCandlePrices(t *testing.T) { - p, err := NewCrescentProvider( - context.TODO(), - zerolog.Nop(), - Endpoint{}, - BCREATOM, - ) - require.NoError(t, err) - - t.Run("valid_request_single_candle", func(t *testing.T) { - price := "34.689998626708984000" - volume := "2396974.000000000000000000" - time := int64(1000000) - - candle := CrescentCandle{ - Volume: volume, - Close: price, - EndTime: time, - } - - p.setCandlePair(candle, "BCRE/ATOM") - - prices, err := p.GetCandlePrices(BCREATOM) - require.NoError(t, err) - require.Len(t, prices, 1) - require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[BCREATOM][0].Price) - require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[BCREATOM][0].Volume) - require.Equal(t, time, prices[BCREATOM][0].TimeStamp) - }) - - t.Run("invalid_request_invalid_candle", func(t *testing.T) { - prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) - require.Empty(t, prices) - }) -} - -func TestCrescentCurrencyPairToCrescentPair(t *testing.T) { - cp := types.CurrencyPair{Base: "BCRE", Quote: "USDT"} - crescentSymbol := currencyPairToCrescentPair(cp) - require.Equal(t, crescentSymbol, "BCRE/USDT") -} diff --git a/oracle/provider/provider.go b/oracle/provider/provider.go index f7c8b22d..5e954c3c 100644 --- a/oracle/provider/provider.go +++ b/oracle/provider/provider.go @@ -21,7 +21,6 @@ const ( ProviderMexc types.ProviderName = "mexc" ProviderCrypto types.ProviderName = "crypto" ProviderPolygon types.ProviderName = "polygon" - ProviderCrescent types.ProviderName = "crescent" ProviderEthUniswap types.ProviderName = "eth-uniswap" ProviderKujira types.ProviderName = "kujira" ProviderMock types.ProviderName = "mock" From f30aca44c0570872737ee19c588db79a65ed1d0f Mon Sep 17 00:00:00 2001 From: ryanbajollari <54822716+rbajollari@users.noreply.github.com> Date: Fri, 16 Aug 2024 00:35:11 +0200 Subject: [PATCH 2/3] fix: Add WBTC/BTC supported conversion (#401) --- config/supported_assets.go | 1 + 1 file changed, 1 insertion(+) diff --git a/config/supported_assets.go b/config/supported_assets.go index 3cff2ca7..42532ef6 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -46,6 +46,7 @@ var ( {Base: "OSMO", Quote: "USDT"}: {}, {Base: "JUNO", Quote: "USDT"}: {}, {Base: "WETH", Quote: "USDC"}: {}, + {Base: "WBTC", Quote: "BTC"}: {}, {Base: "WBTC", Quote: "WETH"}: {}, {Base: "INJ", Quote: "USDT"}: {}, {Base: "TIA", Quote: "USDT"}: {}, From 3db39799f46224ed8b124337043bc28d9d4f116f Mon Sep 17 00:00:00 2001 From: ryanbajollari <54822716+rbajollari@users.noreply.github.com> Date: Thu, 22 Aug 2024 18:11:11 +0200 Subject: [PATCH 3/3] feat: Separate eth providers into individual providers per exchange (#406) --- config/supported_assets.go | 36 ++-- oracle/oracle.go | 12 ++ oracle/provider/balancer.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/balancer_test.go | 116 ++++++++++++ oracle/provider/camelot.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/camelot_test.go | 116 ++++++++++++ oracle/provider/curve.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/curve_test.go | 116 ++++++++++++ oracle/provider/pancake.go | 293 +++++++++++++++++++++++++++++++ oracle/provider/pancake_test.go | 116 ++++++++++++ oracle/provider/provider.go | 34 ++-- oracle/provider/uniswap.go | 4 +- 12 files changed, 1689 insertions(+), 33 deletions(-) create mode 100644 oracle/provider/balancer.go create mode 100644 oracle/provider/balancer_test.go create mode 100644 oracle/provider/camelot.go create mode 100644 oracle/provider/camelot_test.go create mode 100644 oracle/provider/curve.go create mode 100644 oracle/provider/curve_test.go create mode 100644 oracle/provider/pancake.go create mode 100644 oracle/provider/pancake_test.go diff --git a/config/supported_assets.go b/config/supported_assets.go index 42532ef6..0aa7b4fd 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -11,22 +11,26 @@ var ( // SupportedProviders defines a lookup table of all the supported currency API // providers and whether or not they require an API key to be passed in. SupportedProviders = map[types.ProviderName]APIKeyRequired{ - provider.ProviderKraken: false, - provider.ProviderBinance: false, - provider.ProviderBinanceUS: false, - provider.ProviderOsmosis: false, - provider.ProviderOkx: false, - provider.ProviderHuobi: false, - provider.ProviderGate: false, - provider.ProviderCoinbase: false, - provider.ProviderBitget: false, - provider.ProviderMexc: false, - provider.ProviderCrypto: false, - provider.ProviderPolygon: true, - provider.ProviderEthUniswap: false, - provider.ProviderKujira: false, - provider.ProviderAstroport: false, - provider.ProviderMock: false, + provider.ProviderKraken: false, + provider.ProviderBinance: false, + provider.ProviderBinanceUS: false, + provider.ProviderOsmosis: false, + provider.ProviderOkx: false, + provider.ProviderHuobi: false, + provider.ProviderGate: false, + provider.ProviderCoinbase: false, + provider.ProviderBitget: false, + provider.ProviderMexc: false, + provider.ProviderCrypto: false, + provider.ProviderPolygon: true, + provider.ProviderEthUniswap: false, + provider.ProviderEthCamelot: false, + provider.ProviderEthBalancer: false, + provider.ProviderEthPancake: false, + provider.ProviderEthCurve: false, + provider.ProviderKujira: false, + provider.ProviderAstroport: false, + provider.ProviderMock: false, } // SupportedConversions defines a lookup table for which currency pairs we diff --git a/oracle/oracle.go b/oracle/oracle.go index 24646a9a..bb8e7fff 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -507,6 +507,18 @@ func NewProvider( case provider.ProviderEthUniswap: return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...) + case provider.ProviderEthCamelot: + return provider.NewCamelotProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderEthBalancer: + return provider.NewBalancerProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderEthPancake: + return provider.NewPancakeProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderEthCurve: + return provider.NewCurveProvider(ctx, logger, endpoint, providerPairs...) + case provider.ProviderAstroport: return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...) } diff --git a/oracle/provider/balancer.go b/oracle/provider/balancer.go new file mode 100644 index 00000000..139f5f95 --- /dev/null +++ b/oracle/provider/balancer.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + balancerWSHost = "api.eth-api.prod.ojo.network" + balancerWSPath = "/balancer/ws" + balancerWSScheme = "wss" + balancerRestHost = "https://api.eth-api.prod.ojo.network" + balancerRestPath = "/balancer/assetpairs" + balancerAckMsg = "ack" +) + +var _ Provider = (*BalancerProvider)(nil) + +type ( + // BalancerProvider defines an Oracle provider implemented by OJO's + // Balancer API. + // + // REF: https://github.com/ojo-network/ehereum-api + BalancerProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + BalancerTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + BalancerCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // BalancerPairsSummary defines the response structure for an Balancer pairs + // summary. + BalancerPairsSummary struct { + Data []BalancerPairData `json:"data"` + } + + // BalancerPairData defines the data response structure for an Balancer pair. + BalancerPairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewBalancerProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*BalancerProvider, error) { + if endpoints.Name != ProviderEthBalancer { + endpoints = Endpoint{ + Name: ProviderEthBalancer, + Rest: balancerRestHost, + Websocket: balancerWSHost, + } + } + + wsURL := url.URL{ + Scheme: balancerWSScheme, + Host: endpoints.Websocket, + Path: balancerWSPath, + } + + balancerLogger := logger.With().Str("provider", "balancer").Logger() + + provider := &BalancerProvider{ + wsURL: wsURL, + logger: balancerLogger, + endpoints: endpoints, + priceStore: newPriceStore(balancerLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToBalancerPair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + balancerLogger, + ) + + return provider, nil +} + +func (p *BalancerProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *BalancerProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *BalancerProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == balancerAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp BalancerTicker + tickerErr error + candleResp []BalancerCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + balancerPair := currencyPairToBalancerPair(pair) + if msg, ok := messageResp[balancerPair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + balancerPair, + ) + telemetryWebsocketMessage(ProviderEthBalancer, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + balancerPair, + ) + } + telemetryWebsocketMessage(ProviderEthBalancer, MessageTypeCandle) + continue + } + } + } +} + +func (o BalancerTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("balancer: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("balancer: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o BalancerCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("balancer: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("balancer: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *BalancerProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *BalancerProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + balancerRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []BalancerPairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToBalancerPair receives a currency pair and return balancer +// ticker symbol atomusdt@ticker. +func currencyPairToBalancerPair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/balancer_test.go b/oracle/provider/balancer_test.go new file mode 100644 index 00000000..5266653e --- /dev/null +++ b/oracle/provider/balancer_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestBalancerProvider_GetTickerPrices(t *testing.T) { + p, err := NewBalancerProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestBalancerProvider_GetCandlePrices(t *testing.T) { + p, err := NewBalancerProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := BalancerCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestBalancerCurrencyPairToBalancerPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + balancerSymbol := currencyPairToBalancerPair(cp) + require.Equal(t, balancerSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/camelot.go b/oracle/provider/camelot.go new file mode 100644 index 00000000..eb0b669a --- /dev/null +++ b/oracle/provider/camelot.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + camelotWSHost = "api.eth-api.prod.ojo.network" + camelotWSPath = "/camelot/ws" + camelotWSScheme = "wss" + camelotRestHost = "https://api.eth-api.prod.ojo.network" + camelotRestPath = "/camelot/assetpairs" + camelotAckMsg = "ack" +) + +var _ Provider = (*CamelotProvider)(nil) + +type ( + // CamelotProvider defines an Oracle provider implemented by OJO's + // Camelot API. + // + // REF: https://github.com/ojo-network/ehereum-api + CamelotProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + CamelotTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + CamelotCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // CamelotPairsSummary defines the response structure for an Camelot pairs + // summary. + CamelotPairsSummary struct { + Data []CamelotPairData `json:"data"` + } + + // CamelotPairData defines the data response structure for an Camelot pair. + CamelotPairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewCamelotProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*CamelotProvider, error) { + if endpoints.Name != ProviderEthCamelot { + endpoints = Endpoint{ + Name: ProviderEthCamelot, + Rest: camelotRestHost, + Websocket: camelotWSHost, + } + } + + wsURL := url.URL{ + Scheme: camelotWSScheme, + Host: endpoints.Websocket, + Path: camelotWSPath, + } + + camelotLogger := logger.With().Str("provider", "camelot").Logger() + + provider := &CamelotProvider{ + wsURL: wsURL, + logger: camelotLogger, + endpoints: endpoints, + priceStore: newPriceStore(camelotLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToCamelotPair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + camelotLogger, + ) + + return provider, nil +} + +func (p *CamelotProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *CamelotProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *CamelotProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == camelotAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp CamelotTicker + tickerErr error + candleResp []CamelotCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + camelotPair := currencyPairToCamelotPair(pair) + if msg, ok := messageResp[camelotPair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + camelotPair, + ) + telemetryWebsocketMessage(ProviderEthCamelot, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + camelotPair, + ) + } + telemetryWebsocketMessage(ProviderEthCamelot, MessageTypeCandle) + continue + } + } + } +} + +func (o CamelotTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("camelot: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("camelot: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o CamelotCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("camelot: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("camelot: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *CamelotProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *CamelotProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + camelotRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []CamelotPairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToCamelotPair receives a currency pair and return camelot +// ticker symbol atomusdt@ticker. +func currencyPairToCamelotPair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/camelot_test.go b/oracle/provider/camelot_test.go new file mode 100644 index 00000000..bd460093 --- /dev/null +++ b/oracle/provider/camelot_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestCamelotProvider_GetTickerPrices(t *testing.T) { + p, err := NewCamelotProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCamelotProvider_GetCandlePrices(t *testing.T) { + p, err := NewCamelotProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := CamelotCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCamelotCurrencyPairToCamelotPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + camelotSymbol := currencyPairToCamelotPair(cp) + require.Equal(t, camelotSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/curve.go b/oracle/provider/curve.go new file mode 100644 index 00000000..76086143 --- /dev/null +++ b/oracle/provider/curve.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + curveWSHost = "api.eth-api.prod.ojo.network" + curveWSPath = "/curve/ws" + curveWSScheme = "wss" + curveRestHost = "https://api.eth-api.prod.ojo.network" + curveRestPath = "/curve/assetpairs" + curveAckMsg = "ack" +) + +var _ Provider = (*CurveProvider)(nil) + +type ( + // CurveProvider defines an Oracle provider implemented by OJO's + // Curve API. + // + // REF: https://github.com/ojo-network/ehereum-api + CurveProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + CurveTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + CurveCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // CurvePairsSummary defines the response structure for an Curve pairs + // summary. + CurvePairsSummary struct { + Data []CurvePairData `json:"data"` + } + + // CurvePairData defines the data response structure for an Curve pair. + CurvePairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewCurveProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*CurveProvider, error) { + if endpoints.Name != ProviderEthCurve { + endpoints = Endpoint{ + Name: ProviderEthCurve, + Rest: curveRestHost, + Websocket: curveWSHost, + } + } + + wsURL := url.URL{ + Scheme: curveWSScheme, + Host: endpoints.Websocket, + Path: curveWSPath, + } + + curveLogger := logger.With().Str("provider", "curve").Logger() + + provider := &CurveProvider{ + wsURL: wsURL, + logger: curveLogger, + endpoints: endpoints, + priceStore: newPriceStore(curveLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToCurvePair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + curveLogger, + ) + + return provider, nil +} + +func (p *CurveProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *CurveProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *CurveProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == curveAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp CurveTicker + tickerErr error + candleResp []CurveCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + curvePair := currencyPairToCurvePair(pair) + if msg, ok := messageResp[curvePair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + curvePair, + ) + telemetryWebsocketMessage(ProviderEthCurve, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + curvePair, + ) + } + telemetryWebsocketMessage(ProviderEthCurve, MessageTypeCandle) + continue + } + } + } +} + +func (o CurveTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("curve: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("curve: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o CurveCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("curve: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("curve: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *CurveProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *CurveProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + curveRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []CurvePairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToCurvePair receives a currency pair and return curve +// ticker symbol atomusdt@ticker. +func currencyPairToCurvePair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/curve_test.go b/oracle/provider/curve_test.go new file mode 100644 index 00000000..4c9a8998 --- /dev/null +++ b/oracle/provider/curve_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestCurveProvider_GetTickerPrices(t *testing.T) { + p, err := NewCurveProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCurveProvider_GetCandlePrices(t *testing.T) { + p, err := NewCurveProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := CurveCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestCurveCurrencyPairToCurvePair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + curveSymbol := currencyPairToCurvePair(cp) + require.Equal(t, curveSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/pancake.go b/oracle/provider/pancake.go new file mode 100644 index 00000000..999da952 --- /dev/null +++ b/oracle/provider/pancake.go @@ -0,0 +1,293 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/math" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +const ( + pancakeWSHost = "api.eth-api.prod.ojo.network" + pancakeWSPath = "/pancake/ws" + pancakeWSScheme = "wss" + pancakeRestHost = "https://api.eth-api.prod.ojo.network" + pancakeRestPath = "/pancake/assetpairs" + pancakeAckMsg = "ack" +) + +var _ Provider = (*PancakeProvider)(nil) + +type ( + // PancakeProvider defines an Oracle provider implemented by OJO's + // Pancake API. + // + // REF: https://github.com/ojo-network/ehereum-api + PancakeProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + priceStore + } + + PancakeTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` + } + + PancakeCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` + } + + // PancakePairsSummary defines the response structure for an Pancake pairs + // summary. + PancakePairsSummary struct { + Data []PancakePairData `json:"data"` + } + + // PancakePairData defines the data response structure for an Pancake pair. + PancakePairData struct { + Base string `json:"base"` + Quote string `json:"quote"` + } +) + +func NewPancakeProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*PancakeProvider, error) { + if endpoints.Name != ProviderEthPancake { + endpoints = Endpoint{ + Name: ProviderEthPancake, + Rest: pancakeRestHost, + Websocket: pancakeWSHost, + } + } + + wsURL := url.URL{ + Scheme: pancakeWSScheme, + Host: endpoints.Websocket, + Path: pancakeWSPath, + } + + pancakeLogger := logger.With().Str("provider", "pancake").Logger() + + provider := &PancakeProvider{ + wsURL: wsURL, + logger: pancakeLogger, + endpoints: endpoints, + priceStore: newPriceStore(pancakeLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToPancakePair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + provider.setSubscribedPairs(confirmedPairs...) + + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + pancakeLogger, + ) + + return provider, nil +} + +func (p *PancakeProvider) StartConnections() { + p.wsc.StartConnections() +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *PancakeProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +func (p *PancakeProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == pancakeAckMsg { + return + } + + var ( + messageResp map[string]interface{} + messageErr error + tickerResp PancakeTicker + tickerErr error + candleResp []PancakeCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } + + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + pancakePair := currencyPairToPancakePair(pair) + if msg, ok := messageResp[pancakePair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + pancakePair, + ) + telemetryWebsocketMessage(ProviderEthPancake, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + pancakePair, + ) + } + telemetryWebsocketMessage(ProviderEthPancake, MessageTypeCandle) + continue + } + } + } +} + +func (o PancakeTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := math.LegacyNewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("pancake: failed to parse ticker price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("pancake: failed to parse ticker volume: %w", err) + } + + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil +} + +func (o PancakeCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := math.LegacyNewDecFromStr(o.Close) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("pancake: failed to parse candle price: %w", err) + } + volume, err := math.LegacyNewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("pancake: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *PancakeProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *PancakeProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + pancakeRestPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var pairsSummary []PancakePairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } + + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, + } + availablePairs[strings.ToUpper(cp.String())] = struct{}{} + } + + return availablePairs, nil +} + +// currencyPairToPancakePair receives a currency pair and return pancake +// ticker symbol atomusdt@ticker. +func currencyPairToPancakePair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote +} diff --git a/oracle/provider/pancake_test.go b/oracle/provider/pancake_test.go new file mode 100644 index 00000000..a1a38c33 --- /dev/null +++ b/oracle/provider/pancake_test.go @@ -0,0 +1,116 @@ +package provider + +import ( + "context" + "testing" + + "cosmossdk.io/math" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestPancakeProvider_GetTickerPrices(t *testing.T) { + p, err := NewPancakeProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := math.LegacyMustNewDecFromStr("34.69000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) + + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := math.LegacyMustNewDecFromStr("34.69000000") + lastPriceLuna := math.LegacyMustNewDecFromStr("41.35000000") + volume := math.LegacyMustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestPancakeProvider_GetCandlePrices(t *testing.T) { + p, err := NewPancakeProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := PancakeCandle{ + Volume: volume, + Close: price, + EndTime: time, + } + + p.setCandlePair(candle, "OSMO/ATOM") + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, math.LegacyMustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, math.LegacyMustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestPancakeCurrencyPairToPancakePair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + pancakeSymbol := currencyPairToPancakePair(cp) + require.Equal(t, pancakeSymbol, "ATOM/USDT") +} diff --git a/oracle/provider/provider.go b/oracle/provider/provider.go index 5e954c3c..e5611dd9 100644 --- a/oracle/provider/provider.go +++ b/oracle/provider/provider.go @@ -9,21 +9,25 @@ import ( const ( defaultTimeout = 10 * time.Second - ProviderKraken types.ProviderName = "kraken" - ProviderBinance types.ProviderName = "binance" - ProviderBinanceUS types.ProviderName = "binanceus" - ProviderOsmosis types.ProviderName = "osmosis" - ProviderHuobi types.ProviderName = "huobi" - ProviderOkx types.ProviderName = "okx" - ProviderGate types.ProviderName = "gate" - ProviderCoinbase types.ProviderName = "coinbase" - ProviderBitget types.ProviderName = "bitget" - ProviderMexc types.ProviderName = "mexc" - ProviderCrypto types.ProviderName = "crypto" - ProviderPolygon types.ProviderName = "polygon" - ProviderEthUniswap types.ProviderName = "eth-uniswap" - ProviderKujira types.ProviderName = "kujira" - ProviderMock types.ProviderName = "mock" + ProviderKraken types.ProviderName = "kraken" + ProviderBinance types.ProviderName = "binance" + ProviderBinanceUS types.ProviderName = "binanceus" + ProviderOsmosis types.ProviderName = "osmosis" + ProviderHuobi types.ProviderName = "huobi" + ProviderOkx types.ProviderName = "okx" + ProviderGate types.ProviderName = "gate" + ProviderCoinbase types.ProviderName = "coinbase" + ProviderBitget types.ProviderName = "bitget" + ProviderMexc types.ProviderName = "mexc" + ProviderCrypto types.ProviderName = "crypto" + ProviderPolygon types.ProviderName = "polygon" + ProviderEthUniswap types.ProviderName = "eth-uniswap" + ProviderEthCamelot types.ProviderName = "eth-camelot" + ProviderEthBalancer types.ProviderName = "eth-balancer" + ProviderEthPancake types.ProviderName = "eth-pancake" + ProviderEthCurve types.ProviderName = "eth-curve" + ProviderKujira types.ProviderName = "kujira" + ProviderMock types.ProviderName = "mock" ) var ( diff --git a/oracle/provider/uniswap.go b/oracle/provider/uniswap.go index 586305fd..7f268665 100644 --- a/oracle/provider/uniswap.go +++ b/oracle/provider/uniswap.go @@ -17,10 +17,10 @@ import ( const ( uniswapWSHost = "api.eth-api.prod.ojo.network" - uniswapWSPath = "ws" + uniswapWSPath = "/uniswap/ws" uniswapWSScheme = "wss" uniswapRestHost = "https://api.eth-api.prod.ojo.network" - uniswapRestPath = "/assetpairs" + uniswapRestPath = "/uniswap/assetpairs" uniswapAckMsg = "ack" )