diff --git a/relayer/cmd/relayer.go b/relayer/cmd/relayer.go index 1071939..c80e704 100644 --- a/relayer/cmd/relayer.go +++ b/relayer/cmd/relayer.go @@ -121,6 +121,7 @@ func relayerCmdHandler(cmd *cobra.Command, args []string) error { cfg.Account.Address, cfg.RPC.GRPCEndpoint, cfg.Gas, + cfg.GasPrices, ) if err != nil { return err diff --git a/relayer/config/config.go b/relayer/config/config.go index 784e4ee..2b870f6 100644 --- a/relayer/config/config.go +++ b/relayer/config/config.go @@ -26,6 +26,7 @@ type ( Keyring Keyring `mapstructure:"keyring" validate:"required,gt=0,dive,required"` RPC RPC `mapstructure:"rpc" validate:"required,gt=0,dive,required"` Gas uint64 `mapstructure:"gas"` + GasPrices string `mapstructure:"gas_prices"` Relayer Relayer `mapstructure:"relayer" validate:"required,gt=0,dive,required"` Assets []Assets `mapstructure:"assets" validate:"required,gt=0,dive,required"` } diff --git a/relayer/relayer.toml b/relayer/relayer.toml index 5bb3df0..4a406ec 100644 --- a/relayer/relayer.toml +++ b/relayer/relayer.toml @@ -1,8 +1,9 @@ -gas = 50000 +gas = 1000000 +gas_prices = "0.025uojo" [account] -address = "ojo1zypqa76je7pxsdwkfah6mu9a583sju6xzthge3" -chain_id = "ojo-testnet" +address = "ojo1kjqcup59v5jtlykewz90em6v0cz7tqpd7u7nyr" +chain_id = "agamotto" [keyring] backend = "test" @@ -14,7 +15,10 @@ rpc_timeout = "100ms" tmrpc_endpoint = "http://localhost:26657" [relayer] +# "heartbeat" interval for the relayer interval = "24h" +# deviation expressed as a percentage +# e.g., 0.01 means 1% deviation = "0.05" destination = "Arbitrum" contract = "0x001" diff --git a/relayer/relayer/client/client.go b/relayer/relayer/client/client.go index 343e97a..4603cc1 100644 --- a/relayer/relayer/client/client.go +++ b/relayer/relayer/client/client.go @@ -66,6 +66,7 @@ func NewRelayerClient( relayerAddrString string, grpcEndpoint string, gas uint64, + gasPrices string, ) (RelayerClient, error) { relayerAddr, err := sdk.AccAddressFromBech32(relayerAddrString) if err != nil { @@ -84,6 +85,7 @@ func NewRelayerClient( RelayerAddrString: relayerAddrString, Encoding: ojoparams.MakeEncodingConfig(), Gas: gas, + GasPrices: gasPrices, GRPCEndpoint: grpcEndpoint, } diff --git a/relayer/relayer/relayer.go b/relayer/relayer/relayer.go index 0ee024d..e04e43a 100644 --- a/relayer/relayer/relayer.go +++ b/relayer/relayer/relayer.go @@ -83,61 +83,93 @@ func (o *Relayer) Stop() { <-o.closer.Done() } -func (r *Relayer) tick(ctx context.Context) error { - r.logger.Debug().Msg("executing relayer tick") +// init initializes the relayer by submitting a relay for +// each asset in the config and setting the latest price info +// in memory. +func (r *Relayer) init(ctx context.Context) error { + latestAssets := make([]asset, len(r.cfg.Assets)) + batch := []string{} + for k, v := range r.cfg.Assets { + // Get price + price, err := r.relayerClient.GetPrice(ctx, v.Denom) + if err != nil { + r.logger.Err(err).Msg("unable to communicate with ojo node") + return err + } + priceFl, err := price.Amount.Float64() + if err != nil { + r.logger.Err(err).Msg("unable to convert price to float64") + return err + } - // Check if it is our first tick - if len(r.latestAssets) == 0 { - latestAssets := make([]asset, len(r.cfg.Assets)) - for k, v := range r.cfg.Assets { - // Get price - price, err := r.relayerClient.GetPrice(ctx, v.Denom) - if err != nil { - r.logger.Err(err).Msg("unable to communicate with ojo node") - return err - } - priceFl, err := price.Amount.Float64() - if err != nil { - r.logger.Err(err).Msg("unable to convert price to float64") - return err - } + // Set latest update in memory + latestAssets[k] = asset{ + lastPrice: priceFl, + lastRelay: time.Now(), + denom: v.Denom, + } - // Set latest update in memory - latestAssets[k] = asset{ - lastPrice: priceFl, - lastRelay: time.Now(), - denom: v.Denom, - } + // Add to batch + batch = append(batch, v.Denom) + } + + // Relay price + if err := r.relay(batch); err != nil { + r.logger.Err(err).Msg("unable to submit initial relays") + return err + } + + // Set to memory + r.latestAssets = latestAssets + return nil +} - // Relay price - if err := r.relay(v.Denom); err != nil { - r.logger.Err(err).Msg("unable to relay price") - return err +// updateMemory takes a set of denoms and updates the memory with the latest price +// and timestamp. +func (r *Relayer) updateMemory(ctx context.Context, denoms []string) error { + for _, v := range denoms { + price, err := r.getPrice(ctx, v) + if err != nil { + r.logger.Err(err).Msg("unable to communicate with ojo node") + return err + } + + for k, a := range r.latestAssets { + if a.denom == v { + r.latestAssets[k].lastPrice = price + r.latestAssets[k].lastRelay = time.Now() } } + } - // Set to memory - r.latestAssets = latestAssets + return nil +} + +func (r *Relayer) tick(ctx context.Context) error { + r.logger.Debug().Msg("executing relayer tick") + + // check if it is our first tick + if len(r.latestAssets) == 0 { + err := r.init(ctx) + if err != nil { + return err + } return nil } + // if it's not our first tick, check to see if we need + // to do any relays. + + // denomsBatch is a slice of denoms that we need to relay + batch := []string{} + // if not, check for heartbeats and deviations for _, v := range r.latestAssets { // if heartbeat needs to be sent, relay if heartbeat(r.cfg.Relayer.Interval, v.lastRelay) { - err := r.relay(v.denom) - if err != nil { - return err - } - - // update memory - v.lastRelay = time.Now() - v.lastPrice, err = r.getPrice(ctx, v.denom) - if err != nil { - r.logger.Err(err).Msg("unable to communicate with ojo node") - return err - } - return nil + batch = append(batch, v.denom) + r.logger.Info().Str("denom", v.denom).Msg("heartbeat relay") + continue } // if price has deviated, send a relay @@ -146,17 +178,27 @@ func (r *Relayer) tick(ctx context.Context) error { r.logger.Err(err).Msg("unable to communicate with ojo node") return err } - if deviated(price, r.cfg.Relayer.Deviation, v.lastPrice) { - - err := r.relay(v.denom) - if err != nil { - return err - } + pct, dev := deviated(v.lastPrice, price, r.cfg.Relayer.Deviation) + if dev { + batch = append(batch, v.denom) + r.logger.Info().Str("denom", v.denom). + Float64("last_updated_price", v.lastPrice). + Float64("new_price", price). + Float64("deviation_percentage", pct). + Float64("deviation_threshold", r.cfg.Relayer.Deviation). + Msg("deviation relay") + } + } - v.lastRelay = time.Now() - v.lastPrice = price - return nil + // batch relays and then update memory + if len(batch) > 0 { + if err := r.relay(batch); err != nil { + r.logger.Err(err).Msg("unable to relay price") + return err } + r.updateMemory(ctx, batch) + } else { + r.logger.Info().Msg("no relays necessary") } return nil @@ -164,16 +206,29 @@ func (r *Relayer) tick(ctx context.Context) error { // heartbeat checks the time since last relay and returns true if we need to relay. func heartbeat(interval time.Duration, lastUpdate time.Time) bool { - return time.Since(lastUpdate) > interval + return time.Since(lastUpdate) >= interval } // deviated checks if the price has deviated from the last price by the deviation %. -func deviated(price float64, deviation float64, lastPrice float64) bool { - return price > lastPrice*(1+deviation) || price < lastPrice*(1-deviation) +func deviated(existingPrice float64, newestPrice float64, threshold float64) (float64, bool) { + if existingPrice == 0 { + return 0, false + } + + // calculate the deviation percentage between price and lastPrice + deviationPct := (newestPrice - existingPrice) / existingPrice + + // get absolute value + if deviationPct < 0 { + deviationPct *= -1 + } + + return deviationPct, deviationPct >= threshold } // relay sends a relay message to the Ojo node. -func (r Relayer) relay(denom string) error { +func (r Relayer) relay(denoms []string) error { + r.logger.Info().Strs("denoms", denoms).Msg("submitting relay tx") // normalize the coin denom coins, err := sdk.ParseCoinNormalized(r.cfg.Relayer.Tokens) if err != nil { @@ -187,15 +242,14 @@ func (r Relayer) relay(denom string) error { msg := gmptypes.NewMsgRelay( r.cfg.Account.Address, r.cfg.Relayer.Destination, - "0x001", // ojo contract address - empty r.cfg.Relayer.Contract, - coins, // tokens we're paying with - []string{denom}, + "0x001", // ojo contract address - empty + coins, // tokens we're paying with + denoms, // tokens we're relaying []byte{}, // command selector - empty []byte{}, // params - empty, no callback time.Now().Unix(), // unix timestamp ) - currentHeight, err := r.relayerClient.ChainHeight.GetChainHeight() if err != nil { return err diff --git a/relayer/relayer/relayer_test.go b/relayer/relayer/relayer_test.go new file mode 100644 index 0000000..98a53d9 --- /dev/null +++ b/relayer/relayer/relayer_test.go @@ -0,0 +1,106 @@ +package relayer + +import ( + "testing" + "time" +) + +func TestHeartbeat(t *testing.T) { + tests := []struct { + name string + interval time.Duration + lastUpdate time.Time + want bool + }{ + { + name: "Time since last update is less than interval", + interval: 5 * time.Second, + lastUpdate: time.Now().Add(-3 * time.Second), + want: false, + }, + { + name: "Time since last update is equal to interval", + interval: 5 * time.Second, + lastUpdate: time.Now().Add(-5 * time.Second), + want: true, + }, + { + name: "Time since last update is greater than interval", + interval: 5 * time.Second, + lastUpdate: time.Now().Add(-7 * time.Second), + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := heartbeat(tt.interval, tt.lastUpdate); got != tt.want { + t.Errorf("heartbeat() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDeviated(t *testing.T) { + tests := []struct { + name string + price float64 + newPrice float64 + threshold float64 + wantDev float64 + wantBool bool + }{ + { + name: "No deviation", + price: 100.0, + newPrice: 100.0, + threshold: 10.0, + wantDev: 0.0, + wantBool: false, + }, + { + name: "Deviation less than threshold", + price: 100.0, + newPrice: 105.0, + threshold: 0.10, + wantDev: 0.05, + wantBool: false, + }, + { + name: "Deviation equal to threshold", + price: 100.0, + newPrice: 110.0, + threshold: 0.10, + wantDev: 0.10, + wantBool: true, + }, + { + name: "Deviation greater than threshold", + price: 100.0, + newPrice: 115.0, + threshold: 0.10, + wantDev: 0.15, + wantBool: true, + }, + { + name: "Existing price larger than new price", + price: 100.0, + newPrice: 85.0, + threshold: 0.10, + wantDev: 0.15, + wantBool: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotDev, gotBool := deviated(tt.price, tt.newPrice, tt.threshold) + if gotDev != tt.wantDev { + t.Errorf("deviated() gotDev = %v, want %v", gotDev, tt.wantDev) + } + if gotBool != tt.wantBool { + t.Errorf("deviated() gotBool = %v, want %v", gotBool, tt.wantBool) + } + }) + } +}