Skip to content

Commit

Permalink
feat: Read currency pair providers and deviations from on chain params (
Browse files Browse the repository at this point in the history
#260)

* Add on chain config read

* update ojo commit import

* lint

* check for currency pair updates

* lint

* consume param update event

* implement updated parame update event

* lint

* gosum

* gomod

* pr comments
  • Loading branch information
rbajollari authored Oct 31, 2023
1 parent 0a9b2f2 commit 4ff8c0e
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 268 deletions.
25 changes: 22 additions & 3 deletions cmd/price-feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ const (
logLevelJSON = "json"
logLevelText = "text"

flagLogLevel = "log-level"
flagLogFormat = "log-format"
flagSkipProviderCheck = "skip-provider-check"
flagLogLevel = "log-level"
flagLogFormat = "log-format"
flagSkipProviderCheck = "skip-provider-check"
flagConfigCurrencyProviders = "config-currency-providers"

envVariablePass = "PRICE_FEEDER_PASS"
)
Expand All @@ -61,6 +62,11 @@ func init() {
rootCmd.PersistentFlags().String(flagLogLevel, zerolog.InfoLevel.String(), "logging level")
rootCmd.PersistentFlags().String(flagLogFormat, logLevelText, "logging format; must be either json or text")
rootCmd.PersistentFlags().Bool(flagSkipProviderCheck, false, "skip the coingecko API provider check")
rootCmd.PersistentFlags().Bool(
flagConfigCurrencyProviders,
false,
"use config file for currency pair providers and deviations instead of on chain values",
)

rootCmd.AddCommand(getVersionCmd())
}
Expand Down Expand Up @@ -95,6 +101,11 @@ func priceFeederCmdHandler(cmd *cobra.Command, args []string) error {
return err
}

configCurrencyProviders, err := cmd.Flags().GetBool(flagConfigCurrencyProviders)
if err != nil {
return err
}

var logWriter io.Writer
switch strings.ToLower(logFormatStr) {
case logLevelJSON:
Expand Down Expand Up @@ -173,8 +184,16 @@ func priceFeederCmdHandler(cmd *cobra.Command, args []string) error {
providerTimeout,
deviations,
cfg.ProviderEndpointsMap(),
!configCurrencyProviders,
)

if !configCurrencyProviders {
err := oracle.LoadProviderPairsAndDeviations(ctx)
if err != nil {
return err
}
}

telemetryCfg := telemetry.Config{}
err = mapstructure.Decode(cfg.Telemetry, &telemetryCfg)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/hasura/go-graphql-client v0.10.0
github.com/justinas/alice v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/ojo-network/ojo v0.2.1
github.com/ojo-network/ojo v0.2.2-0.20231010234643-a2f77484eaf6
github.com/rs/cors v1.10.1
github.com/rs/zerolog v1.31.0
github.com/spf13/cobra v1.7.0
Expand Down Expand Up @@ -96,6 +96,7 @@ require (
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/getsentry/sentry-go v0.23.0 // indirect
github.com/gin-gonic/gin v1.8.1 // indirect
github.com/go-critic/go-critic v0.9.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
Expand Down
363 changes: 151 additions & 212 deletions go.sum

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions ojo-provider-config/currency-pairs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ providers = [
]
quote = "OSMO"

[[currency_pairs]]
base = "CMST"
providers = [
"crescent",
]
quote = "USDC"

[[currency_pairs]]
base = "DAI"
providers = [
Expand Down
157 changes: 112 additions & 45 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type Oracle struct {
oracleClient client.OracleClient
deviations map[string]sdk.Dec
endpoints map[types.ProviderName]provider.Endpoint
paramCache ParamCache
paramCache *ParamCache
chainConfig bool

pricesMutex sync.RWMutex
lastPriceSyncTS time.Time
Expand All @@ -82,6 +83,7 @@ func New(
providerTimeout time.Duration,
deviations map[string]sdk.Dec,
endpoints map[types.ProviderName]provider.Endpoint,
chainConfig bool,
) *Oracle {
return &Oracle{
logger: logger.With().Str("module", "oracle").Logger(),
Expand All @@ -92,13 +94,53 @@ func New(
previousPrevote: nil,
providerTimeout: providerTimeout,
deviations: deviations,
paramCache: ParamCache{},
paramCache: &ParamCache{params: nil},
chainConfig: chainConfig,
endpoints: endpoints,
}
}

// LoadProviderPairsAndDeviations loads the on chain pair providers and
// deviations from the oracle params.
func (o *Oracle) LoadProviderPairsAndDeviations(ctx context.Context) error {
blockHeight, err := o.oracleClient.ChainHeight.GetChainHeight()
if err != nil {
return err
}
if blockHeight < 1 {
return fmt.Errorf("expected positive block height")
}

oracleParams, err := o.GetParamCache(ctx, blockHeight)
if err != nil {
return err
}

o.providerPairs = createPairProvidersFromCurrencyPairProvidersList(oracleParams.CurrencyPairProviders)
o.deviations, err = createDeviationsFromCurrencyDeviationThresholdList(oracleParams.CurrencyDeviationThresholds)
if err != nil {
return err
}

return nil
}

// Start starts the oracle process in a blocking fashion.
func (o *Oracle) Start(ctx context.Context) error {
// initialize param cache
clientCtx, err := o.oracleClient.CreateClientContext()
if err != nil {
return err
}
err = o.paramCache.Initialize(
ctx,
clientCtx.Client,
o.logger,
)
if err != nil {
return err
}

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -351,49 +393,6 @@ func SetProviderTickerPricesAndCandles(
return pricesOk || candlesOk
}

// GetParamCache returns the last updated parameters of the x/oracle module
// if the current ParamCache is outdated, we will query it again.
func (o *Oracle) GetParamCache(ctx context.Context, currentBlockHeigh int64) (oracletypes.Params, error) {
if !o.paramCache.IsOutdated(currentBlockHeigh) {
return *o.paramCache.params, nil
}

params, err := o.GetParams(ctx)
if err != nil {
return oracletypes.Params{}, err
}

o.checkAcceptList(params)
o.paramCache.Update(currentBlockHeigh, params)
return params, nil
}

// GetParams returns the current on-chain parameters of the x/oracle module.
func (o *Oracle) GetParams(ctx context.Context) (oracletypes.Params, error) {
grpcConn, err := grpc.Dial(
o.oracleClient.GRPCEndpoint,
// the Cosmos SDK doesn't support any transport security mechanism
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialerFunc),
)
if err != nil {
return oracletypes.Params{}, fmt.Errorf("failed to dial Cosmos gRPC service: %w", err)
}

defer grpcConn.Close()
queryClient := oracletypes.NewQueryClient(grpcConn)

ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

queryResponse, err := queryClient.Params(ctx, &oracletypes.QueryParams{})
if err != nil {
return oracletypes.Params{}, fmt.Errorf("failed to get x/oracle params: %w", err)
}

return queryResponse.Params, nil
}

func (o *Oracle) getOrSetProvider(ctx context.Context, providerName types.ProviderName) (provider.Provider, error) {
var (
priceProvider provider.Provider
Expand Down Expand Up @@ -480,6 +479,58 @@ func NewProvider(
return nil, fmt.Errorf("provider %s not found", providerName)
}

// GetParamCache returns the last updated parameters of the x/oracle module
// if the current ParamCache is outdated or a param update event was found, the cache is updated.
func (o *Oracle) GetParamCache(ctx context.Context, currentBlockHeight int64) (oracletypes.Params, error) {
if !o.paramCache.IsOutdated(currentBlockHeight) && !o.paramCache.paramUpdateEvent {
return *o.paramCache.params, nil
}

currentParams := o.paramCache.params
newParams, err := o.GetParams(ctx)
if err != nil {
return oracletypes.Params{}, err
}

o.checkAcceptList(newParams)
o.paramCache.UpdateParamCache(currentBlockHeight, newParams, nil)

if o.chainConfig && currentParams != nil {
err = o.checkCurrencyPairAndDeviations(*currentParams, newParams)
if err != nil {
return oracletypes.Params{}, err
}
}

return newParams, nil
}

// GetParams returns the current on-chain parameters of the x/oracle module.
func (o *Oracle) GetParams(ctx context.Context) (oracletypes.Params, error) {
grpcConn, err := grpc.Dial(
o.oracleClient.GRPCEndpoint,
// the Cosmos SDK doesn't support any transport security mechanism
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialerFunc),
)
if err != nil {
return oracletypes.Params{}, fmt.Errorf("failed to dial Cosmos gRPC service: %w", err)
}

defer grpcConn.Close()
queryClient := oracletypes.NewQueryClient(grpcConn)

ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

queryResponse, err := queryClient.Params(ctx, &oracletypes.QueryParams{})
if err != nil {
return oracletypes.Params{}, fmt.Errorf("failed to get x/oracle params: %w", err)
}

return queryResponse.Params, nil
}

func (o *Oracle) checkAcceptList(params oracletypes.Params) {
for _, denom := range params.AcceptList {
symbol := strings.ToUpper(denom.SymbolDenom)
Expand All @@ -490,6 +541,22 @@ func (o *Oracle) checkAcceptList(params oracletypes.Params) {
}
}

func (o *Oracle) checkCurrencyPairAndDeviations(currentParams, newParams oracletypes.Params) (err error) {
if currentParams.CurrencyPairProviders.String() != newParams.CurrencyPairProviders.String() {
o.logger.Debug().Msg("Updating Currency Pair Providers Map")
o.providerPairs = createPairProvidersFromCurrencyPairProvidersList(newParams.CurrencyPairProviders)
}
if currentParams.CurrencyDeviationThresholds.String() != newParams.CurrencyDeviationThresholds.String() {
o.logger.Debug().Msg("Updating Currency Deviation Thresholds Map")
o.deviations, err = createDeviationsFromCurrencyDeviationThresholdList(newParams.CurrencyDeviationThresholds)
if err != nil {
return err
}
}

return nil
}

func (o *Oracle) tick(ctx context.Context) error {
o.logger.Debug().Msg("executing oracle tick")

Expand Down
1 change: 1 addition & 0 deletions oracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (ots *OracleTestSuite) SetupSuite() {
time.Millisecond*100,
make(map[string]sdk.Dec),
make(map[types.ProviderName]provider.Endpoint),
false,
)
}

Expand Down
Loading

0 comments on commit 4ff8c0e

Please sign in to comment.