From 9ca454596436484528ff9fc3f283074227c89687 Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 13 Nov 2023 13:45:47 -0700 Subject: [PATCH] feat: Update uniswap provider to use new websocket api (#308) * replace uniswap provider with new websocket api --- .golangci.yml | 3 + ojo-provider-config/currency-pairs.toml | 39 +- ojo-provider-config/endpoints.toml | 5 - oracle/oracle.go | 2 +- oracle/provider/uniswap.go | 584 +++++++++--------------- oracle/provider/uniswap_test.go | 378 ++++----------- tests/integration/provider_test.go | 5 +- 7 files changed, 330 insertions(+), 686 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 4ed2fc24..04668825 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -51,6 +51,9 @@ issues: - path: _test\.go linters: - gosec + - path: oracle/provider/* + linters: + - gosec - linters: - lll source: "https://" diff --git a/ojo-provider-config/currency-pairs.toml b/ojo-provider-config/currency-pairs.toml index 17cd616d..ef7a940a 100644 --- a/ojo-provider-config/currency-pairs.toml +++ b/ojo-provider-config/currency-pairs.toml @@ -99,55 +99,49 @@ base = "WETH" providers = [ "eth-uniswap" ] - quote = "USDC" -[[currency_pairs.pair_address_providers]] -address = "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640" -provider = "eth-uniswap" [[currency_pairs]] base = "WBTC" providers = [ "eth-uniswap" ] - quote = "WETH" -[[currency_pairs.pair_address_providers]] -address = "0xcbcdf9626bc03e24f779434178a73a0b4bad62ed" -provider = "eth-uniswap" [[currency_pairs]] base = "CBETH" providers = [ "eth-uniswap" ] - quote = "WETH" -[[currency_pairs.pair_address_providers]] -address = "0x840deeef2f115cf50da625f7368c24af6fe74410" -provider = "eth-uniswap" [[currency_pairs]] base = "LINK" providers = [ "eth-uniswap" ] - quote = "WETH" -[[currency_pairs.pair_address_providers]] -address = "0xa6cc3c2531fdaa6ae1a3ca84c2855806728693e8" -provider = "eth-uniswap" [[currency_pairs]] base = "RETH" providers = [ "eth-uniswap", ] +quote = "WETH" +[[currency_pairs]] +base = "SFRXETH" +providers = [ + "eth-uniswap", +] +quote = "WETH" + +[[currency_pairs]] +base = "WSTETH" +providers = [ + "eth-uniswap", +] quote = "WETH" -[[currency_pairs.pair_address_providers]] -address = "0xa4e0faA58465A2D369aa21B3e42d43374c6F9613" -provider = "eth-uniswap" [[currency_pairs]] base = "stATOM" @@ -164,13 +158,6 @@ providers = [ ] quote = "OSMO" -[[currency_pairs]] -base = "CMST" -providers = [ - "crescent", -] -quote = "USDC" - [[currency_pairs]] base = "DAI" providers = [ diff --git a/ojo-provider-config/endpoints.toml b/ojo-provider-config/endpoints.toml index 7bf2e7ed..3a393b49 100644 --- a/ojo-provider-config/endpoints.toml +++ b/ojo-provider-config/endpoints.toml @@ -2,8 +2,3 @@ name = "binance" rest = "https://api1.binance.com" websocket = "stream.binance.com:9443" - -[[provider_endpoints]] -name = "eth-uniswap" -rest = "http://104.197.233.185:8000/subgraphs/name/ojo-network/unidexer" -websocket = "not supported" diff --git a/oracle/oracle.go b/oracle/oracle.go index e6f10798..a225a9c9 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -473,7 +473,7 @@ func NewProvider( return provider.NewMockProvider(), nil case provider.ProviderEthUniswap: - return provider.NewUniswapProvider(ctx, logger, providerName.String(), endpoint, providerPairs...), nil + return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...) } return nil, fmt.Errorf("provider %s not found", providerName) diff --git a/oracle/provider/uniswap.go b/oracle/provider/uniswap.go index f1bf1839..51f1fa1f 100644 --- a/oracle/provider/uniswap.go +++ b/oracle/provider/uniswap.go @@ -2,438 +2,292 @@ package provider import ( "context" + "encoding/json" "fmt" - "strconv" + "net/http" + "net/url" "strings" "sync" - "time" sdk "github.com/cosmos/cosmos-sdk/types" - gql "github.com/hasura/go-graphql-client" + "github.com/gorilla/websocket" + "github.com/ojo-network/price-feeder/oracle/types" "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" +) - "github.com/ojo-network/price-feeder/oracle/types" +const ( + uniswapWSHost = "api.eth-api.prod.ojo.network" + uniswapWSPath = "ws" + uniswapWSScheme = "wss" + uniswapRestHost = "https://api.eth-api.prod.ojo.network" + uniswapRestPath = "/assetpairs" + uniswapAckMsg = "ack" ) var _ Provider = (*UniswapProvider)(nil) -const USDC = "USDC" - type ( + // UniswapProvider defines an Oracle provider implemented by OJO's + // Uniswap API. + // + // REF: https://github.com/ojo-network/ehereum-api + UniswapProvider struct { + wsc *WebsocketController + wsURL url.URL + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint - // BundleQuery eth price query has fixed id of 1 - BundleQuery struct { - Bundle struct { - EthPriceUSD string `graphql:"ethPriceUSD"` - ID string `graphql:"id"` - } `graphql:"bundle(id: \"1\")"` + priceStore } - Token struct { - Name string `graphql:"name"` - Symbol string `graphql:"symbol"` + UniswapTicker struct { + Price string `json:"Price"` + Volume string `json:"Volume"` } - PoolMinuteDataCandleQuery struct { - PoolMinuteDatas []struct { - ID string `graphql:"id"` - PoolID string `graphql:"poolID"` - PeriodStartUnix float64 `graphql:"periodStartUnix"` - Timestamp float64 `graphql:"timestamp"` - Token0 Token `graphql:"token0"` - Token1 Token `graphql:"token1"` - Token0Price string `graphql:"token0Price"` - Token0PriceUSD string `graphql:"token0PriceUSD"` - Token1PriceUSD string `graphql:"token1PriceUSD"` - Token1Price string `graphql:"token1Price"` - VolumeUSDTracked string `graphql:"volumeUSDTracked"` - VolumeUSDUntracked string `graphql:"volumeUSDUntracked"` - Token0Volume string `graphql:"token0Volume"` - Token1Volume string `graphql:"token1Volume"` - } `graphql:"poolMinuteDatas(first:$first, after:$after, orderBy: periodStartUnix, orderDirection: asc, where: {poolID_in: $poolIDS, periodStartUnix_gte: $start,periodStartUnix_lte:$stop})"` //nolint:lll + UniswapCandle struct { + Close string `json:"Close"` + Volume string `json:"Volume"` + EndTime int64 `json:"EndTime"` } - PoolHourDataQuery struct { - PoolHourDatas []struct { - ID string `graphql:"id"` - PoolID string `graphql:"poolID"` - PeriodStartUnix float64 `graphql:"periodStartUnix"` - Timestamp float64 `graphql:"timestamp"` - Token0 Token `graphql:"token0"` - Token1 Token `graphql:"token1"` - Token0Price string `graphql:"token0Price"` - Token1Price string `graphql:"token1Price"` - Token0PriceUSD string `graphql:"token0PriceUSD"` - Token1PriceUSD string `graphql:"token1PriceUSD"` - VolumeUSDTracked string `graphql:"volumeUSDTracked"` - VolumeUSDUntracked string `graphql:"volumeUSDUntracked"` - Token0Volume string `graphql:"token0Volume"` - Token1Volume string `graphql:"token1Volume"` - } `graphql:"poolHourDatas(first: $first,after: $after, orderBy: periodStartUnix, orderDirection: desc, where: {poolID_in: $poolIDS, periodStartUnix_gte:$start,periodStartUnix_lte:$stop})"` //nolint:lll + // UniswapPairsSummary defines the response structure for an Uniswap pairs + // summary. + UniswapPairsSummary struct { + Data []UniswapPairData `json:"data"` } - // UniswapProvider defines an Oracle provider implemented to consume data from Uniswap graphql - UniswapProvider struct { - logger zerolog.Logger - baseURL string - // support concurrent quries - tickerClient *gql.Client - candleClient *gql.Client - mut sync.Mutex - - poolIDS []string - pairs []types.CurrencyPair - denomToAddress map[string]string - addressToPair map[string]types.CurrencyPair - poolsHoursDatas PoolHourDataQuery - poolsMinuteDatas PoolMinuteDataCandleQuery + // UniswapPairData defines the data response structure for an Uniswap pair. + UniswapPairData struct { + Base string `json:"base"` + Quote string `json:"quote"` } ) func NewUniswapProvider( ctx context.Context, logger zerolog.Logger, - providerName string, - endpoint Endpoint, - currencyPairs ...types.CurrencyPair, -) *UniswapProvider { - // create pair name to address map - denomToAddress := make(map[string]string) - addressToPair := make(map[string]types.CurrencyPair) - for _, pair := range currencyPairs { - // graph supports all lower case id's - // currently supports only 1 fee tier pool per currency pair - address := strings.ToLower(pair.Address) - denomToAddress[pair.String()] = address - addressToPair[address] = pair + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*UniswapProvider, error) { + if endpoints.Name != ProviderEthUniswap { + endpoints = Endpoint{ + Name: ProviderEthUniswap, + Rest: uniswapRestHost, + Websocket: uniswapWSHost, + } } - // default provider to eth uniswap - uniswapLogger := logger.With().Str("provider", providerName).Logger() - provider := &UniswapProvider{ - baseURL: endpoint.Rest, - tickerClient: gql.NewClient(endpoint.Rest, nil), - candleClient: gql.NewClient(endpoint.Rest, nil), - denomToAddress: denomToAddress, - addressToPair: addressToPair, - logger: uniswapLogger, - pairs: currencyPairs, - mut: sync.Mutex{}, + wsURL := url.URL{ + Scheme: uniswapWSScheme, + Host: endpoints.Websocket, + Path: uniswapWSPath, } - go provider.startPooling(ctx) + uniswapLogger := logger.With().Str("provider", "uniswap").Logger() - return provider -} - -func (p *UniswapProvider) startPooling(ctx context.Context) { - tick := 0 - err := p.setPoolIDS() + provider := &UniswapProvider{ + wsURL: wsURL, + logger: uniswapLogger, + endpoints: endpoints, + priceStore: newPriceStore(uniswapLogger), + } + provider.setCurrencyPairToTickerAndCandlePair(currencyPairToUniswapPair) + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) if err != nil { - p.logger.Err(err).Msg("error generating pool ids") - return + return nil, err } - for { - select { - case <-ctx.Done(): - return - - default: - if err := p.getHourAndMinuteData(ctx); err != nil { - p.logger.Err(err).Msgf("failed to get hour and minute data") - } + provider.setSubscribedPairs(confirmedPairs...) - tick++ - p.logger.Log().Int("uniswap tick", tick) + provider.wsc = NewWebsocketController( + ctx, + endpoints.Name, + wsURL, + []interface{}{""}, + provider.messageReceived, + defaultPingDuration, + websocket.PingMessage, + uniswapLogger, + ) - // slightly larger than a second (time for new candle data to be populated) - time.Sleep(time.Millisecond * 1100) - } - } + return provider, nil } + func (p *UniswapProvider) StartConnections() { - // no-op Uniswap v1 does not use websockets + p.wsc.StartConnections() } -func (p *UniswapProvider) getHourAndMinuteData(ctx context.Context) error { - g, ctx := errgroup.WithContext(ctx) - - // ticker prices - g.Go(func() error { - idMap := map[string]interface{}{ - "poolIDS": p.poolIDS, - "start": time.Now().Unix() - 86400, - "stop": time.Now().Unix(), - } - - var lastID string - var firstID string - var poolsHourDatas PoolHourDataQuery - for { - // limit by graph - idMap["first"] = 1000 - idMap["after"] = lastID - - // query volume from day data - var poolsHourData PoolHourDataQuery - err := p.tickerClient.Query(ctx, &poolsHourData, idMap) - if err != nil { - return err - } - - // check if no new id or repeated id - if len(poolsHourData.PoolHourDatas) == 0 || firstID == poolsHourData.PoolHourDatas[0].ID { - break - } - - firstID = poolsHourData.PoolHourDatas[0].ID - lastID = poolsHourData.PoolHourDatas[len(poolsHourData.PoolHourDatas)-1].ID - - // append poolsHourDatas - poolsHourDatas.PoolHourDatas = append(poolsHourDatas.PoolHourDatas, poolsHourData.PoolHourDatas...) - } - - p.mut.Lock() - p.poolsHoursDatas = poolsHourDatas - p.mut.Unlock() - - return nil - }) - - // candle prices - g.Go(func() error { - idMap := map[string]interface{}{ - "poolIDS": p.poolIDS, - "start": time.Now().Unix() - int64((30 * time.Minute).Seconds()), - "stop": time.Now().Unix(), - } - - var lastID string - var firstID string - var poolsMinuteDatas PoolMinuteDataCandleQuery - for { - // limit by graph - idMap["first"] = 1000 - idMap["after"] = lastID - - // query volume from day data - var poolsMinuteData PoolMinuteDataCandleQuery - err := p.candleClient.Query(ctx, &poolsMinuteData, idMap) - if err != nil { - return err - } - - // check if no new id or repeated id - if len(poolsMinuteData.PoolMinuteDatas) == 0 || firstID == poolsMinuteData.PoolMinuteDatas[0].ID { - break - } - - firstID = poolsMinuteData.PoolMinuteDatas[0].ID - lastID = poolsMinuteData.PoolMinuteDatas[len(poolsMinuteData.PoolMinuteDatas)-1].ID - - poolsMinuteDatas.PoolMinuteDatas = append(poolsMinuteDatas.PoolMinuteDatas, poolsMinuteData.PoolMinuteDatas...) - } - - p.mut.Lock() - p.poolsMinuteDatas = poolsMinuteDatas - p.mut.Unlock() - - return nil - }) +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *UniswapProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + cps..., + ) + if err != nil { + return + } - err := g.Wait() - return err + p.setSubscribedPairs(confirmedPairs...) } -// SubscribeCurrencyPairs performs a no-op since Uniswap does not use websockets -func (p *UniswapProvider) SubscribeCurrencyPairs(...types.CurrencyPair) {} - -func (p *UniswapProvider) GetTickerPrices(_ ...types.CurrencyPair) (types.CurrencyPairTickers, error) { - tickerPrices := make(types.CurrencyPairTickers) - latestTimestamp := make(map[string]float64) - - p.mut.Lock() - poolHourDatas := p.poolsHoursDatas - p.mut.Unlock() - - for _, poolData := range poolHourDatas.PoolHourDatas { - symbol0 := strings.ToUpper(poolData.Token0.Symbol) - symbol1 := strings.ToUpper(poolData.Token1.Symbol) - - // check if this pair is request - requestedPair, found := p.addressToPair[strings.ToLower(poolData.PoolID)] - if !found { - continue - } - - base := requestedPair.Base - quote := requestedPair.Quote - name := requestedPair.String() - var tokenPrice string - var tokenVolume string - switch { - case base == symbol0 && quote == symbol1: - tokenPrice = poolData.Token1Price - tokenVolume = poolData.Token0Volume - - case base == symbol0 && (quote == USDC && symbol1 != USDC): - // consider USDC BASED PRICING - tokenPrice = poolData.Token0PriceUSD - tokenVolume = poolData.VolumeUSDTracked - - case base == symbol1 && quote == symbol0: - // flip prices - tokenPrice = poolData.Token0Price - tokenVolume = poolData.Token1Volume - - case base == symbol1 && (quote == USDC && symbol0 != USDC): - // consider USDC BASED PRICING - tokenPrice = poolData.Token1PriceUSD - tokenVolume = poolData.VolumeUSDTracked - - default: - return nil, fmt.Errorf("price conversion error, pair %s and quote %s mismatch", base, quote) - } - - price, err := toSdkDec(tokenPrice) - if err != nil { - return nil, err - } +func (p *UniswapProvider) messageReceived(_ int, _ *WebsocketConnection, bz []byte) { + // check if message is an ack + if string(bz) == uniswapAckMsg { + return + } - timestamp := poolData.PeriodStartUnix - vol, err := toSdkDec(tokenVolume) - if err != nil { - return nil, err - } + var ( + messageResp map[string]interface{} + messageErr error + tickerResp UniswapTicker + tickerErr error + candleResp []UniswapCandle + candleErr error + ) + + messageErr = json.Unmarshal(bz, &messageResp) + if messageErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("message", messageErr). + Msg("Error on receive message") + } - // update price according to latest timestamp - if timestamp > latestTimestamp[name] { - latestTimestamp[name] = timestamp - if _, found := tickerPrices[requestedPair]; !found { - tickerPrices[requestedPair] = types.TickerPrice{Price: price, Volume: sdk.ZeroDec()} - } else { - tickerPrices[requestedPair].Price.Set(price) + // Check the response for currency pairs that the provider is subscribed + // to and determine whether it is a ticker or candle. + for _, pair := range p.subscribedPairs { + uniswapPair := currencyPairToUniswapPair(pair) + if msg, ok := messageResp[uniswapPair]; ok { + switch v := msg.(type) { + // ticker response + case map[string]interface{}: + tickerString, _ := json.Marshal(v) + tickerErr = json.Unmarshal(tickerString, &tickerResp) + if tickerErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("ticker", tickerErr). + Msg("Error on receive message") + continue + } + p.setTickerPair( + tickerResp, + uniswapPair, + ) + telemetryWebsocketMessage(ProviderEthUniswap, MessageTypeTicker) + continue + + // candle response + case []interface{}: + // use latest candlestick in list if there is one + if len(v) == 0 { + continue + } + candleString, _ := json.Marshal(v) + candleErr = json.Unmarshal(candleString, &candleResp) + if candleErr != nil { + p.logger.Error(). + Int("length", len(bz)). + AnErr("candle", candleErr). + Msg("Error on receive message") + continue + } + for _, singleCandle := range candleResp { + p.setCandlePair( + singleCandle, + uniswapPair, + ) + } + telemetryWebsocketMessage(ProviderEthUniswap, MessageTypeCandle) + continue } } - - tickerPrices[requestedPair].Volume.Set(tickerPrices[requestedPair].Volume.Add(vol)) } - - return tickerPrices, nil } -func (p *UniswapProvider) GetCandlePrices(_ ...types.CurrencyPair) (types.CurrencyPairCandles, error) { - p.mut.Lock() - poolsMinuteDatas := p.poolsMinuteDatas - p.mut.Unlock() - - candlePrices := make(types.CurrencyPairCandles) - for _, poolData := range poolsMinuteDatas.PoolMinuteDatas { - symbol0 := strings.ToUpper(poolData.Token0.Symbol) // symbol == base in a currency pair - symbol1 := strings.ToUpper(poolData.Token1.Symbol) // symbol == quote in a currency pai// r - - // check if this pair is request - requestedPair, found := p.addressToPair[strings.ToLower(poolData.PoolID)] - if !found { - continue - } - - base := requestedPair.Base - quote := requestedPair.Quote - var tokenPrice string - var tokenVolume string - switch { - case base == symbol0 && quote == symbol1: - // pricing - tokenPrice = poolData.Token1Price - tokenVolume = poolData.Token0Volume - - case base == symbol0 && (quote == USDC && symbol1 != USDC): - // consider USDC BASED PRICING - tokenPrice = poolData.Token0PriceUSD - tokenVolume = poolData.VolumeUSDTracked - - case base == symbol1 && quote == symbol0: - // flip prices here - tokenPrice = poolData.Token0Price - tokenVolume = poolData.Token1Volume - - case base == symbol1 && (quote == USDC && symbol0 != USDC): - // consider USDC BASED PRICING - tokenPrice = poolData.Token1PriceUSD - tokenVolume = poolData.VolumeUSDTracked - - default: - return nil, fmt.Errorf("price conversion error, pair %s and quote %s mismatch", base, quote) - } - price, err := toSdkDec(tokenPrice) - if err != nil { - return nil, err - } - - vol, err := toSdkDec(tokenVolume) - if err != nil { - return nil, err - } - - // second to millisecond for filtering - candlePrices[requestedPair] = append( - candlePrices[requestedPair], types.CandlePrice{ - Price: price, - Volume: vol, - TimeStamp: int64(poolData.Timestamp * 1000), - }, - ) +func (o UniswapTicker) toTickerPrice() (types.TickerPrice, error) { + price, err := sdk.NewDecFromStr(o.Price) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("uniswap: failed to parse ticker price: %w", err) + } + volume, err := sdk.NewDecFromStr(o.Volume) + if err != nil { + return types.TickerPrice{}, fmt.Errorf("uniswap: failed to parse ticker volume: %w", err) } - return candlePrices, nil + tickerPrice := types.TickerPrice{ + Price: price, + Volume: volume, + } + return tickerPrice, nil } -// GetBundle returns eth price -func (p *UniswapProvider) GetBundle() (float64, error) { - var bundle BundleQuery - err := p.tickerClient.Query(context.Background(), &bundle, nil) +func (o UniswapCandle) toCandlePrice() (types.CandlePrice, error) { + close, err := sdk.NewDecFromStr(o.Close) if err != nil { - return 0, err + return types.CandlePrice{}, fmt.Errorf("uniswap: failed to parse candle price: %w", err) } - - return strconv.ParseFloat(bundle.Bundle.EthPriceUSD, 64) + volume, err := sdk.NewDecFromStr(o.Volume) + if err != nil { + return types.CandlePrice{}, fmt.Errorf("uniswap: failed to parse candle volume: %w", err) + } + candlePrice := types.CandlePrice{ + Price: close, + Volume: volume, + TimeStamp: o.EndTime, + } + return candlePrice, nil } -// GetAvailablePairs return all available pairs symbol to susbscribe. -func (p *UniswapProvider) GetAvailablePairs() (map[string]struct{}, error) { - availablePairs := make(map[string]struct{}) - - // return denoms that is tracked at provider init - for denom := range p.denomToAddress { - availablePairs[denom] = struct{}{} //nolint:structcheck +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *UniswapProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp } - - return availablePairs, nil } -func toSdkDec(value string) (sdk.Dec, error) { - valueFloat, err := strconv.ParseFloat(value, 64) +// GetAvailablePairs returns all pairs to which the provider can subscribe. +// ex.: map["ATOMUSDT" => {}, "OJOUSDC" => {}]. +func (p *UniswapProvider) GetAvailablePairs() (map[string]struct{}, error) { + resp, err := http.Get(p.endpoints.Rest + uniswapRestPath) if err != nil { - return sdk.ZeroDec(), err + return nil, err } + defer resp.Body.Close() - return sdk.NewDecFromStr(fmt.Sprintf("%.18f", valueFloat)) -} + var pairsSummary []UniswapPairData + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return nil, err + } -func (p *UniswapProvider) setPoolIDS() error { - poolIDS := make([]string, len(p.pairs)) - for i, pair := range p.pairs { - if _, found := p.denomToAddress[pair.String()]; !found { - return fmt.Errorf("pool id for %s not found", pair.String()) + availablePairs := make(map[string]struct{}, len(pairsSummary)) + for _, pair := range pairsSummary { + cp := types.CurrencyPair{ + Base: pair.Base, + Quote: pair.Quote, } - - poolID := p.denomToAddress[pair.String()] - poolIDS[i] = poolID + availablePairs[strings.ToUpper(cp.String())] = struct{}{} } - p.poolIDS = poolIDS - return nil + return availablePairs, nil +} + +// currencyPairToUniswapPair receives a currency pair and return uniswap +// ticker symbol atomusdt@ticker. +func currencyPairToUniswapPair(cp types.CurrencyPair) string { + return cp.Base + "/" + cp.Quote } diff --git a/oracle/provider/uniswap_test.go b/oracle/provider/uniswap_test.go index 0cdfb350..a0cf7611 100644 --- a/oracle/provider/uniswap_test.go +++ b/oracle/provider/uniswap_test.go @@ -2,311 +2,115 @@ package provider import ( "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "os" - "strconv" - "strings" - "sync" "testing" - "time" - "github.com/cometbft/cometbft/libs/rand" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/hasura/go-graphql-client" - "github.com/rs/zerolog" - "github.com/stretchr/testify/suite" - "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" ) -type Bundle struct { - EthPriceUSD string `json:"ethPriceUSD"` - ID string `json:"id"` -} - -type BundleResponse struct { - Data struct { - Bundle Bundle `json:"bundle"` - } `json:"data"` -} - -type Tokens struct { - Name string `json:"name"` - Symbol string `json:"symbol"` -} - -type PoolMinuteData struct { - ID string `json:"id"` - PoolID string `json:"poolID"` - PeriodStartUnix int `json:"periodStartUnix"` - Token0 Tokens `json:"token0"` - Token1 Tokens `json:"token1"` - Token0Price string `json:"token0Price"` - Token1Price string `json:"token1Price"` - Token0Volume string `json:"token0Volume"` - Token1Volume string `json:"token1Volume"` - VolumeUSDTracked string `json:"volumeUSDTracked"` -} - -type PoolHourData struct { - ID string `json:"id"` - PoolID string `json:"poolID"` - PeriodStartUnix float64 `json:"periodStartUnix"` - Token0 Tokens `json:"token0"` - Token1 Tokens `json:"token1"` - Token0Price string `json:"token0Price"` - Token1Price string `json:"token1Price"` - VolumeUSDTracked string `json:"volumeUSDTracked"` - Token0Volume string `json:"token0Volume"` - Token1Volume string `json:"token1Volume"` - VolumeUSDUntracked string `json:"volumeUSDUntracked"` -} - -type PoolMinuteDataResponse struct { - Data struct { - PoolMinuteDatas []PoolMinuteData `json:"poolMinuteDatas"` - } `json:"data"` -} - -type PoolHourDataResponse struct { - Data struct { - PoolHourDatas []PoolHourData `json:"poolHourDatas"` - } `json:"data"` -} - -// setMockData generates random data for eth price and pool minute and hour data -func (p *ProviderTestSuite) setMockData() { - p.pairAddress = []string{"0xa4e0faA58465A2D369aa21B3e42d43374C6F9613", "0x840DEEef2f115Cf50DA625F7368C24af6fE74410"} - p.ethPriceUSD = strconv.FormatFloat(rand.Float64()*3000, 'f', -1, 64) - p.totalVolume = make([]sdk.Dec, len(p.pairAddress)) - - // generate 24 pool data for each pair - for i, pair := range p.pairAddress { - // generate address pair - p.totalVolume[i] = sdk.ZeroDec() - - cPair := types.CurrencyPair{ - Base: fmt.Sprintf("SYBMOL0%d", i), - Quote: fmt.Sprintf("SYBMOL1%d", i), - Address: pair, +func TestUniswapProvider_GetTickerPrices(t *testing.T) { + p, err := NewUniswapProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + OSMOATOM, + ) + require.NoError(t, err) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := sdk.MustNewDecFromStr("34.69000000") + volume := sdk.MustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]types.TickerPrice{} + tickerMap["OSMO/ATOM"] = types.TickerPrice{ + Price: lastPrice, + Volume: volume, } - p.currencyPairs = append(p.currencyPairs, cPair) + p.tickers = tickerMap - for j := 0; j < 24; j++ { - volFloat := strconv.FormatFloat(rand.Float64()*10000, 'f', -1, 64) - vol, _ := toSdkDec(volFloat) - p.hourData = append(p.hourData, PoolHourData{ - ID: fmt.Sprintf("%s-%d", pair, j), - PoolID: pair, - PeriodStartUnix: float64(time.Now().Unix() - int64(24*60*60*j)), - Token0: Tokens{ - Name: fmt.Sprintf("TEST0%d", i), - Symbol: fmt.Sprintf("SYBMOL0%d", i), - }, - Token1: Tokens{ - Name: fmt.Sprintf("TEST1%d", i), - Symbol: fmt.Sprintf("SYBMOL1%d", i), - }, - Token0Price: strconv.FormatFloat(rand.Float64()*3000, 'f', -1, 64), - Token1Price: strconv.FormatFloat(rand.Float64()*10000, 'f', -1, 64), - VolumeUSDTracked: volFloat, - Token0Volume: volFloat, - Token1Volume: volFloat, - }) + prices, err := p.GetTickerPrices(OSMOATOM) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, lastPrice, prices[OSMOATOM].Price) + require.Equal(t, volume, prices[OSMOATOM].Volume) + }) - p.totalVolume[i].Set(p.totalVolume[i].Add(vol)) - } - } + t.Run("valid_request_multi_ticker", func(t *testing.T) { + lastPriceAtom := sdk.MustNewDecFromStr("34.69000000") + lastPriceLuna := sdk.MustNewDecFromStr("41.35000000") + volume := sdk.MustNewDecFromStr("2396974.02000000") - // generate 10 minute pool data for each pair - for i, pair := range p.pairAddress { - for j := 0; j < 10; j++ { - vol := strconv.FormatFloat(rand.Float64()*10000, 'f', -1, 64) - p.minuteData = append(p.minuteData, PoolMinuteData{ - ID: fmt.Sprintf("%s-%d", pair, j), - PoolID: pair, - PeriodStartUnix: int(time.Now().Unix() - int64(60*j)), - Token0: Tokens{ - Name: fmt.Sprintf("TEST0%d", i), - Symbol: fmt.Sprintf("SYBMOL0%d", i), - }, - Token1: Tokens{ - Name: fmt.Sprintf("TEST1%d", i), - Symbol: fmt.Sprintf("SYBMOL1%d", i), - }, - Token0Price: strconv.FormatFloat(rand.Float64()*3000, 'f', -1, 64), - Token1Price: strconv.FormatFloat(rand.Float64()*10000, 'f', -1, 64), - VolumeUSDTracked: vol, - Token0Volume: vol, - Token1Volume: vol, - }) + tickerMap := map[string]types.TickerPrice{} + tickerMap["ATOM/USDT"] = types.TickerPrice{ + Price: lastPriceAtom, + Volume: volume, } - } -} -func (p *ProviderTestSuite) setupMockServer() { - server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - res.Header().Set("Content-Type", "application/json") - - var jsonResponse []byte - var response interface{} - switch req.URL.Path { - case "/bundle": - response = BundleResponse{ - Data: struct { - Bundle Bundle `json:"bundle"` - }{ - Bundle: Bundle{ - EthPriceUSD: p.ethPriceUSD, - ID: "1", - }, - }, - } - - jsonResponse, _ = json.Marshal(response) - - case "/poolHourData": - response = PoolHourDataResponse{ - Data: struct { - PoolHourDatas []PoolHourData `json:"poolHourDatas"` - }(struct{ PoolHourDatas []PoolHourData }{PoolHourDatas: p.hourData}), - } - - jsonResponse, _ = json.Marshal(response) - case "/poolMinuteData": - response = PoolMinuteDataResponse{ - Data: struct { - PoolMinuteDatas []PoolMinuteData `json:"poolMinuteDatas"` - }(struct{ PoolMinuteDatas []PoolMinuteData }{PoolMinuteDatas: p.minuteData}), - } - - jsonResponse, _ = json.Marshal(response) - default: - http.NotFound(res, req) - return + tickerMap["LUNA/USDT"] = types.TickerPrice{ + Price: lastPriceLuna, + Volume: volume, } - res.Write(jsonResponse) - })) - - p.server = server -} - -func (p *ProviderTestSuite) createClient() { - // create clients and pairs - // create pair name to address map - denomToAddress := make(map[string]string) - addressToPair := make(map[string]types.CurrencyPair) - for _, pair := range p.currencyPairs { - // graph supports all lower case id's - // currently supports only 1 fee tier pool per currency pair - address := strings.ToLower(pair.Address) - denomToAddress[pair.String()] = address - addressToPair[address] = pair - } - - // default provider to eth uniswap - logger := zerolog.Logger{}.Output(zerolog.ConsoleWriter{Out: os.Stderr}).Level(zerolog.ErrorLevel) - uniswapLogger := logger.With().Str("provider", "eth-uniswap").Logger() - provider := &UniswapProvider{ - baseURL: p.server.URL, - tickerClient: nil, - candleClient: graphql.NewClient(fmt.Sprintf(p.server.URL+"/poolMinuteData"), p.server.Client()), - denomToAddress: denomToAddress, - addressToPair: addressToPair, - logger: uniswapLogger, - pairs: p.currencyPairs, - mut: sync.Mutex{}, - } - - p.provider = provider -} - -type ProviderTestSuite struct { - suite.Suite - server *httptest.Server - provider *UniswapProvider - ethPriceUSD string - pairAddress []string - currencyPairs []types.CurrencyPair - minuteData []PoolMinuteData - hourData []PoolHourData - totalVolume []sdk.Dec -} - -func (p *ProviderTestSuite) SetupSuite() { - p.setMockData() - p.setupMockServer() - p.createClient() -} - -func (p *ProviderTestSuite) TeadDownSuite() { - p.server.Close() -} - -func (p *ProviderTestSuite) TestGetBundle() { - p.provider.tickerClient = graphql.NewClient(fmt.Sprintf(p.server.URL+"/bundle"), p.server.Client()) - ethPrice, err := p.provider.GetBundle() - p.NoError(err) - - price, err := strconv.ParseFloat(p.ethPriceUSD, 64) - p.Require().NoError(err) - - p.EqualValues(price, ethPrice) -} - -func (p *ProviderTestSuite) TestGetTickerPrices() { - p.provider.tickerClient = graphql.NewClient(fmt.Sprintf(p.server.URL+"/poolHourData"), p.server.Client()) - err := p.provider.getHourAndMinuteData(context.Background()) - p.NoError(err) - - data, err := p.provider.GetTickerPrices(p.currencyPairs...) - p.NoError(err) - - p.Len(data, len(p.currencyPairs)) - - for i, pair := range p.currencyPairs { - hourData := p.hourData[i*24] - - price, err := toSdkDec(hourData.Token1Price) - p.NoError(err) - - ticker := data[pair] - p.EqualValues(ticker.Price.String(), price.String()) - p.EqualValues(ticker.Volume.String(), p.totalVolume[i].String()) - } -} - -func (p *ProviderTestSuite) TestGetCandlePrices() { - p.provider.tickerClient = graphql.NewClient(fmt.Sprintf(p.server.URL+"/poolHourData"), p.server.Client()) - err := p.provider.getHourAndMinuteData(context.Background()) - p.NoError(err) - - data, err := p.provider.GetCandlePrices(p.currencyPairs...) - p.NoError(err) - p.Len(data, len(p.currencyPairs)) - - for i, pair := range p.currencyPairs { - candleData := data[pair] - minuteData := p.minuteData[i*10 : (i+1)*10] + p.tickers = tickerMap + prices, err := p.GetTickerPrices( + ATOMUSDT, + LUNAUSDT, + ) + require.NoError(t, err) + require.Len(t, prices, 2) + require.Equal(t, lastPriceAtom, prices[ATOMUSDT].Price) + require.Equal(t, volume, prices[ATOMUSDT].Volume) + require.Equal(t, lastPriceLuna, prices[LUNAUSDT].Price) + require.Equal(t, volume, prices[LUNAUSDT].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, _ := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) +} + +func TestUniswapProvider_GetCandlePrices(t *testing.T) { + p, err := NewUniswapProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}, + ) + require.NoError(t, err) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := "34.689998626708984000" + volume := "2396974.000000000000000000" + time := int64(1000000) + + candle := UniswapCandle{ + Volume: volume, + Close: price, + EndTime: time, + } - for j, candle := range candleData { - price, err := toSdkDec(minuteData[j].Token1Price) - p.NoError(err) + p.setCandlePair(candle, "OSMO/ATOM") - vol, err := toSdkDec(minuteData[j].VolumeUSDTracked) - p.NoError(err) + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "OSMO", Quote: "ATOM"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, sdk.MustNewDecFromStr(price), prices[OSMOATOM][0].Price) + require.Equal(t, sdk.MustNewDecFromStr(volume), prices[OSMOATOM][0].Volume) + require.Equal(t, time, prices[OSMOATOM][0].TimeStamp) + }) - p.EqualValues(candle.Price.String(), price.String()) - p.EqualValues(candle.Volume.String(), vol.String()) - } - } + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, _ := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.Empty(t, prices) + }) } -func TestProviderTestSuite(t *testing.T) { - suite.Run(t, new(ProviderTestSuite)) +func TestUniswapCurrencyPairToUniswapPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + uniswapSymbol := currencyPairToUniswapPair(cp) + require.Equal(t, uniswapSymbol, "ATOM/USDT") } diff --git a/tests/integration/provider_test.go b/tests/integration/provider_test.go index 8358ea80..aa0fa806 100644 --- a/tests/integration/provider_test.go +++ b/tests/integration/provider_test.go @@ -50,16 +50,17 @@ func (s *IntegrationTestSuite) TestWebsocketProviders() { var waitGroup sync.WaitGroup for key, pairs := range cfg.ProviderPairs() { - waitGroup.Add(1) providerName := key currencyPairs := pairs + waitGroup.Add(1) go func() { defer waitGroup.Done() endpoint := endpoints[providerName] ctx, cancel := context.WithCancel(context.Background()) s.T().Logf("Checking %s provider with currency pairs %+v", providerName, currencyPairs) - pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...) + pvd, err := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...) + require.NoError(s.T(), err) pvd.StartConnections() time.Sleep(60 * time.Second) // wait for provider to connect and receive some prices checkForPrices(s.T(), pvd, currencyPairs, providerName.String())