Skip to content

Commit

Permalink
feat: relayer batching (#42)
Browse files Browse the repository at this point in the history
* feat: relayer batching

* tests++
  • Loading branch information
adamewozniak authored May 10, 2024
1 parent 0058691 commit aeb2c90
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 62 deletions.
1 change: 1 addition & 0 deletions relayer/cmd/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func relayerCmdHandler(cmd *cobra.Command, args []string) error {
cfg.Account.Address,
cfg.RPC.GRPCEndpoint,
cfg.Gas,
cfg.GasPrices,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions relayer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
Keyring Keyring `mapstructure:"keyring" validate:"required,gt=0,dive,required"`
RPC RPC `mapstructure:"rpc" validate:"required,gt=0,dive,required"`
Gas uint64 `mapstructure:"gas"`
GasPrices string `mapstructure:"gas_prices"`
Relayer Relayer `mapstructure:"relayer" validate:"required,gt=0,dive,required"`
Assets []Assets `mapstructure:"assets" validate:"required,gt=0,dive,required"`
}
Expand Down
10 changes: 7 additions & 3 deletions relayer/relayer.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
gas = 50000
gas = 1000000
gas_prices = "0.025uojo"

[account]
address = "ojo1zypqa76je7pxsdwkfah6mu9a583sju6xzthge3"
chain_id = "ojo-testnet"
address = "ojo1kjqcup59v5jtlykewz90em6v0cz7tqpd7u7nyr"
chain_id = "agamotto"

[keyring]
backend = "test"
Expand All @@ -14,7 +15,10 @@ rpc_timeout = "100ms"
tmrpc_endpoint = "http://localhost:26657"

[relayer]
# "heartbeat" interval for the relayer
interval = "24h"
# deviation expressed as a percentage
# e.g., 0.01 means 1%
deviation = "0.05"
destination = "Arbitrum"
contract = "0x001"
Expand Down
2 changes: 2 additions & 0 deletions relayer/relayer/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewRelayerClient(
relayerAddrString string,
grpcEndpoint string,
gas uint64,
gasPrices string,
) (RelayerClient, error) {
relayerAddr, err := sdk.AccAddressFromBech32(relayerAddrString)
if err != nil {
Expand All @@ -84,6 +85,7 @@ func NewRelayerClient(
RelayerAddrString: relayerAddrString,
Encoding: ojoparams.MakeEncodingConfig(),
Gas: gas,
GasPrices: gasPrices,
GRPCEndpoint: grpcEndpoint,
}

Expand Down
172 changes: 113 additions & 59 deletions relayer/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,61 +83,93 @@ func (o *Relayer) Stop() {
<-o.closer.Done()
}

func (r *Relayer) tick(ctx context.Context) error {
r.logger.Debug().Msg("executing relayer tick")
// init initializes the relayer by submitting a relay for
// each asset in the config and setting the latest price info
// in memory.
func (r *Relayer) init(ctx context.Context) error {
latestAssets := make([]asset, len(r.cfg.Assets))
batch := []string{}
for k, v := range r.cfg.Assets {
// Get price
price, err := r.relayerClient.GetPrice(ctx, v.Denom)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
priceFl, err := price.Amount.Float64()
if err != nil {
r.logger.Err(err).Msg("unable to convert price to float64")
return err
}

// Check if it is our first tick
if len(r.latestAssets) == 0 {
latestAssets := make([]asset, len(r.cfg.Assets))
for k, v := range r.cfg.Assets {
// Get price
price, err := r.relayerClient.GetPrice(ctx, v.Denom)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
priceFl, err := price.Amount.Float64()
if err != nil {
r.logger.Err(err).Msg("unable to convert price to float64")
return err
}
// Set latest update in memory
latestAssets[k] = asset{
lastPrice: priceFl,
lastRelay: time.Now(),
denom: v.Denom,
}

// Set latest update in memory
latestAssets[k] = asset{
lastPrice: priceFl,
lastRelay: time.Now(),
denom: v.Denom,
}
// Add to batch
batch = append(batch, v.Denom)
}

// Relay price
if err := r.relay(batch); err != nil {
r.logger.Err(err).Msg("unable to submit initial relays")
return err
}

// Set to memory
r.latestAssets = latestAssets
return nil
}

// Relay price
if err := r.relay(v.Denom); err != nil {
r.logger.Err(err).Msg("unable to relay price")
return err
// updateMemory takes a set of denoms and updates the memory with the latest price
// and timestamp.
func (r *Relayer) updateMemory(ctx context.Context, denoms []string) error {
for _, v := range denoms {
price, err := r.getPrice(ctx, v)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}

for k, a := range r.latestAssets {
if a.denom == v {
r.latestAssets[k].lastPrice = price
r.latestAssets[k].lastRelay = time.Now()
}
}
}

// Set to memory
r.latestAssets = latestAssets
return nil
}

func (r *Relayer) tick(ctx context.Context) error {
r.logger.Debug().Msg("executing relayer tick")

// check if it is our first tick
if len(r.latestAssets) == 0 {
err := r.init(ctx)
if err != nil {
return err
}
return nil
}

// if it's not our first tick, check to see if we need
// to do any relays.

// denomsBatch is a slice of denoms that we need to relay
batch := []string{}

// if not, check for heartbeats and deviations
for _, v := range r.latestAssets {
// if heartbeat needs to be sent, relay
if heartbeat(r.cfg.Relayer.Interval, v.lastRelay) {
err := r.relay(v.denom)
if err != nil {
return err
}

// update memory
v.lastRelay = time.Now()
v.lastPrice, err = r.getPrice(ctx, v.denom)
if err != nil {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
return nil
batch = append(batch, v.denom)
r.logger.Info().Str("denom", v.denom).Msg("heartbeat relay")
continue
}

// if price has deviated, send a relay
Expand All @@ -146,34 +178,57 @@ func (r *Relayer) tick(ctx context.Context) error {
r.logger.Err(err).Msg("unable to communicate with ojo node")
return err
}
if deviated(price, r.cfg.Relayer.Deviation, v.lastPrice) {

err := r.relay(v.denom)
if err != nil {
return err
}
pct, dev := deviated(v.lastPrice, price, r.cfg.Relayer.Deviation)
if dev {
batch = append(batch, v.denom)
r.logger.Info().Str("denom", v.denom).
Float64("last_updated_price", v.lastPrice).
Float64("new_price", price).
Float64("deviation_percentage", pct).
Float64("deviation_threshold", r.cfg.Relayer.Deviation).
Msg("deviation relay")
}
}

v.lastRelay = time.Now()
v.lastPrice = price
return nil
// batch relays and then update memory
if len(batch) > 0 {
if err := r.relay(batch); err != nil {
r.logger.Err(err).Msg("unable to relay price")
return err
}
r.updateMemory(ctx, batch)
} else {
r.logger.Info().Msg("no relays necessary")
}

return nil
}

// heartbeat checks the time since last relay and returns true if we need to relay.
func heartbeat(interval time.Duration, lastUpdate time.Time) bool {
return time.Since(lastUpdate) > interval
return time.Since(lastUpdate) >= interval
}

// deviated checks if the price has deviated from the last price by the deviation %.
func deviated(price float64, deviation float64, lastPrice float64) bool {
return price > lastPrice*(1+deviation) || price < lastPrice*(1-deviation)
func deviated(existingPrice float64, newestPrice float64, threshold float64) (float64, bool) {
if existingPrice == 0 {
return 0, false
}

// calculate the deviation percentage between price and lastPrice
deviationPct := (newestPrice - existingPrice) / existingPrice

// get absolute value
if deviationPct < 0 {
deviationPct *= -1
}

return deviationPct, deviationPct >= threshold
}

// relay sends a relay message to the Ojo node.
func (r Relayer) relay(denom string) error {
func (r Relayer) relay(denoms []string) error {
r.logger.Info().Strs("denoms", denoms).Msg("submitting relay tx")
// normalize the coin denom
coins, err := sdk.ParseCoinNormalized(r.cfg.Relayer.Tokens)
if err != nil {
Expand All @@ -187,15 +242,14 @@ func (r Relayer) relay(denom string) error {
msg := gmptypes.NewMsgRelay(
r.cfg.Account.Address,
r.cfg.Relayer.Destination,
"0x001", // ojo contract address - empty
r.cfg.Relayer.Contract,
coins, // tokens we're paying with
[]string{denom},
"0x001", // ojo contract address - empty
coins, // tokens we're paying with
denoms, // tokens we're relaying
[]byte{}, // command selector - empty
[]byte{}, // params - empty, no callback
time.Now().Unix(), // unix timestamp
)

currentHeight, err := r.relayerClient.ChainHeight.GetChainHeight()
if err != nil {
return err
Expand Down
106 changes: 106 additions & 0 deletions relayer/relayer/relayer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package relayer

import (
"testing"
"time"
)

func TestHeartbeat(t *testing.T) {
tests := []struct {
name string
interval time.Duration
lastUpdate time.Time
want bool
}{
{
name: "Time since last update is less than interval",
interval: 5 * time.Second,
lastUpdate: time.Now().Add(-3 * time.Second),
want: false,
},
{
name: "Time since last update is equal to interval",
interval: 5 * time.Second,
lastUpdate: time.Now().Add(-5 * time.Second),
want: true,
},
{
name: "Time since last update is greater than interval",
interval: 5 * time.Second,
lastUpdate: time.Now().Add(-7 * time.Second),
want: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := heartbeat(tt.interval, tt.lastUpdate); got != tt.want {
t.Errorf("heartbeat() = %v, want %v", got, tt.want)
}
})
}
}

func TestDeviated(t *testing.T) {
tests := []struct {
name string
price float64
newPrice float64
threshold float64
wantDev float64
wantBool bool
}{
{
name: "No deviation",
price: 100.0,
newPrice: 100.0,
threshold: 10.0,
wantDev: 0.0,
wantBool: false,
},
{
name: "Deviation less than threshold",
price: 100.0,
newPrice: 105.0,
threshold: 0.10,
wantDev: 0.05,
wantBool: false,
},
{
name: "Deviation equal to threshold",
price: 100.0,
newPrice: 110.0,
threshold: 0.10,
wantDev: 0.10,
wantBool: true,
},
{
name: "Deviation greater than threshold",
price: 100.0,
newPrice: 115.0,
threshold: 0.10,
wantDev: 0.15,
wantBool: true,
},
{
name: "Existing price larger than new price",
price: 100.0,
newPrice: 85.0,
threshold: 0.10,
wantDev: 0.15,
wantBool: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotDev, gotBool := deviated(tt.price, tt.newPrice, tt.threshold)
if gotDev != tt.wantDev {
t.Errorf("deviated() gotDev = %v, want %v", gotDev, tt.wantDev)
}
if gotBool != tt.wantBool {
t.Errorf("deviated() gotBool = %v, want %v", gotBool, tt.wantBool)
}
})
}
}

0 comments on commit aeb2c90

Please sign in to comment.