Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relayer batching #42

Merged
merged 2 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions relayer/cmd/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func relayerCmdHandler(cmd *cobra.Command, args []string) error {
cfg.Account.Address,
cfg.RPC.GRPCEndpoint,
cfg.Gas,
cfg.GasPrices,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions relayer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
Keyring Keyring `mapstructure:"keyring" validate:"required,gt=0,dive,required"`
RPC RPC `mapstructure:"rpc" validate:"required,gt=0,dive,required"`
Gas uint64 `mapstructure:"gas"`
GasPrices string `mapstructure:"gas_prices"`
Relayer Relayer `mapstructure:"relayer" validate:"required,gt=0,dive,required"`
Assets []Assets `mapstructure:"assets" validate:"required,gt=0,dive,required"`
}
Expand Down
10 changes: 7 additions & 3 deletions relayer/relayer.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
gas = 50000
gas = 1000000
gas_prices = "0.025uojo"

[account]
address = "ojo1zypqa76je7pxsdwkfah6mu9a583sju6xzthge3"
chain_id = "ojo-testnet"
address = "ojo1kjqcup59v5jtlykewz90em6v0cz7tqpd7u7nyr"
chain_id = "agamotto"

[keyring]
backend = "test"
Expand All @@ -14,7 +15,10 @@ rpc_timeout = "100ms"
tmrpc_endpoint = "http://localhost:26657"

[relayer]
# "heartbeat" interval for the relayer
interval = "24h"
# deviation expressed as a percentage
# e.g., 0.01 means 1%
deviation = "0.05"
destination = "Arbitrum"
contract = "0x001"
Expand Down
2 changes: 2 additions & 0 deletions relayer/relayer/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewRelayerClient(
relayerAddrString string,
grpcEndpoint string,
gas uint64,
gasPrices string,
) (RelayerClient, error) {
relayerAddr, err := sdk.AccAddressFromBech32(relayerAddrString)
if err != nil {
Expand All @@ -84,6 +85,7 @@ func NewRelayerClient(
RelayerAddrString: relayerAddrString,
Encoding: ojoparams.MakeEncodingConfig(),
Gas: gas,
GasPrices: gasPrices,
GRPCEndpoint: grpcEndpoint,
}

Expand Down
172 changes: 113 additions & 59 deletions relayer/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,61 +83,93 @@ func (o *Relayer) Stop() {
<-o.closer.Done()
}

func (r *Relayer) tick(ctx context.Context) error {
r.logger.Debug().Msg("executing relayer tick")
// init initializes the relayer by submitting a relay for
// each asset in the config and setting the latest price info
// in memory.
func (r *Relayer) init(ctx context.Context) error {
latestAssets := make([]asset, len(r.cfg.Assets))
batch := []string{}
for k, v := range r.cfg.Assets {
// Get price
price, err := r.relayerClient.GetPrice(ctx, v.Denom)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
priceFl, err := price.Amount.Float64()
if err != nil {
r.logger.Err(err).Msg("unable to convert price to float64")
return err
}

// Check if it is our first tick
if len(r.latestAssets) == 0 {
latestAssets := make([]asset, len(r.cfg.Assets))
for k, v := range r.cfg.Assets {
// Get price
price, err := r.relayerClient.GetPrice(ctx, v.Denom)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
priceFl, err := price.Amount.Float64()
if err != nil {
r.logger.Err(err).Msg("unable to convert price to float64")
return err
}
// Set latest update in memory
latestAssets[k] = asset{
lastPrice: priceFl,
lastRelay: time.Now(),
denom: v.Denom,
}

// Set latest update in memory
latestAssets[k] = asset{
lastPrice: priceFl,
lastRelay: time.Now(),
denom: v.Denom,
}
// Add to batch
batch = append(batch, v.Denom)
}

// Relay price
if err := r.relay(batch); err != nil {
r.logger.Err(err).Msg("unable to submit initial relays")
return err
}

// Set to memory
r.latestAssets = latestAssets
return nil
}

// Relay price
if err := r.relay(v.Denom); err != nil {
r.logger.Err(err).Msg("unable to relay price")
return err
// updateMemory takes a set of denoms and updates the memory with the latest price
// and timestamp.
func (r *Relayer) updateMemory(ctx context.Context, denoms []string) error {
for _, v := range denoms {
price, err := r.getPrice(ctx, v)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}

for k, a := range r.latestAssets {
if a.denom == v {
r.latestAssets[k].lastPrice = price
r.latestAssets[k].lastRelay = time.Now()
}
}
}

// Set to memory
r.latestAssets = latestAssets
return nil
}

func (r *Relayer) tick(ctx context.Context) error {
r.logger.Debug().Msg("executing relayer tick")

// check if it is our first tick
if len(r.latestAssets) == 0 {
err := r.init(ctx)
if err != nil {
return err
}
return nil
}

// if it's not our first tick, check to see if we need
// to do any relays.

// denomsBatch is a slice of denoms that we need to relay
batch := []string{}

// if not, check for heartbeats and deviations
for _, v := range r.latestAssets {
// if heartbeat needs to be sent, relay
if heartbeat(r.cfg.Relayer.Interval, v.lastRelay) {
err := r.relay(v.denom)
if err != nil {
return err
}

// update memory
v.lastRelay = time.Now()
v.lastPrice, err = r.getPrice(ctx, v.denom)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
return nil
batch = append(batch, v.denom)
r.logger.Info().Str("denom", v.denom).Msg("heartbeat relay")
continue
}

// if price has deviated, send a relay
Expand All @@ -146,34 +178,57 @@ func (r *Relayer) tick(ctx context.Context) error {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
if deviated(price, r.cfg.Relayer.Deviation, v.lastPrice) {

err := r.relay(v.denom)
if err != nil {
return err
}
pct, dev := deviated(v.lastPrice, price, r.cfg.Relayer.Deviation)
if dev {
batch = append(batch, v.denom)
r.logger.Info().Str("denom", v.denom).
Float64("last_updated_price", v.lastPrice).
Float64("new_price", price).
Float64("deviation_percentage", pct).
Float64("deviation_threshold", r.cfg.Relayer.Deviation).
Msg("deviation relay")
}
}

v.lastRelay = time.Now()
v.lastPrice = price
return nil
// batch relays and then update memory
if len(batch) > 0 {
if err := r.relay(batch); err != nil {
r.logger.Err(err).Msg("unable to relay price")
return err
}
r.updateMemory(ctx, batch)
} else {
r.logger.Info().Msg("no relays necessary")
}

return nil
}

// heartbeat checks the time since last relay and returns true if we need to relay.
func heartbeat(interval time.Duration, lastUpdate time.Time) bool {
return time.Since(lastUpdate) > interval
return time.Since(lastUpdate) >= interval
}

// deviated checks if the price has deviated from the last price by the deviation %.
func deviated(price float64, deviation float64, lastPrice float64) bool {
return price > lastPrice*(1+deviation) || price < lastPrice*(1-deviation)
func deviated(existingPrice float64, newestPrice float64, threshold float64) (float64, bool) {
if existingPrice == 0 {
return 0, false
}

// calculate the deviation percentage between price and lastPrice
deviationPct := (newestPrice - existingPrice) / existingPrice

// get absolute value
if deviationPct < 0 {
deviationPct *= -1
}

return deviationPct, deviationPct >= threshold
}

// relay sends a relay message to the Ojo node.
func (r Relayer) relay(denom string) error {
func (r Relayer) relay(denoms []string) error {
r.logger.Info().Strs("denoms", denoms).Msg("submitting relay tx")
// normalize the coin denom
coins, err := sdk.ParseCoinNormalized(r.cfg.Relayer.Tokens)
if err != nil {
Expand All @@ -187,15 +242,14 @@ func (r Relayer) relay(denom string) error {
msg := gmptypes.NewMsgRelay(
r.cfg.Account.Address,
r.cfg.Relayer.Destination,
"0x001", // ojo contract address - empty
r.cfg.Relayer.Contract,
coins, // tokens we're paying with
[]string{denom},
"0x001", // ojo contract address - empty
coins, // tokens we're paying with
denoms, // tokens we're relaying
[]byte{}, // command selector - empty
[]byte{}, // params - empty, no callback
time.Now().Unix(), // unix timestamp
)

currentHeight, err := r.relayerClient.ChainHeight.GetChainHeight()
if err != nil {
return err
Expand Down
106 changes: 106 additions & 0 deletions relayer/relayer/relayer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package relayer

import (
"testing"
"time"
)

func TestHeartbeat(t *testing.T) {
tests := []struct {
name string
interval time.Duration
lastUpdate time.Time
want bool
}{
{
name: "Time since last update is less than interval",
interval: 5 * time.Second,
lastUpdate: time.Now().Add(-3 * time.Second),
want: false,
},
{
name: "Time since last update is equal to interval",
interval: 5 * time.Second,
lastUpdate: time.Now().Add(-5 * time.Second),
want: true,
},
{
name: "Time since last update is greater than interval",
interval: 5 * time.Second,
lastUpdate: time.Now().Add(-7 * time.Second),
want: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := heartbeat(tt.interval, tt.lastUpdate); got != tt.want {
t.Errorf("heartbeat() = %v, want %v", got, tt.want)
}
})
}
}

func TestDeviated(t *testing.T) {
tests := []struct {
name string
price float64
newPrice float64
threshold float64
wantDev float64
wantBool bool
}{
{
name: "No deviation",
price: 100.0,
newPrice: 100.0,
threshold: 10.0,
wantDev: 0.0,
wantBool: false,
},
{
name: "Deviation less than threshold",
price: 100.0,
newPrice: 105.0,
threshold: 0.10,
wantDev: 0.05,
wantBool: false,
},
{
name: "Deviation equal to threshold",
price: 100.0,
newPrice: 110.0,
threshold: 0.10,
wantDev: 0.10,
wantBool: true,
},
{
name: "Deviation greater than threshold",
price: 100.0,
newPrice: 115.0,
threshold: 0.10,
wantDev: 0.15,
wantBool: true,
},
{
name: "Existing price larger than new price",
price: 100.0,
newPrice: 85.0,
threshold: 0.10,
wantDev: 0.15,
wantBool: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotDev, gotBool := deviated(tt.price, tt.newPrice, tt.threshold)
if gotDev != tt.wantDev {
t.Errorf("deviated() gotDev = %v, want %v", gotDev, tt.wantDev)
}
if gotBool != tt.wantBool {
t.Errorf("deviated() gotBool = %v, want %v", gotBool, tt.wantBool)
}
})
}
}
Loading