Skip to content

Commit

Permalink
feat: Add balancer exchange (#18)
Browse files Browse the repository at this point in the history
* feat: Add balancer exchange

* spot price

* fix price

* update log
  • Loading branch information
rbajollari authored Jun 24, 2024
1 parent 235c9da commit 8ef31ab
Show file tree
Hide file tree
Showing 11 changed files with 6,213 additions and 33 deletions.
3,214 changes: 3,214 additions & 0 deletions abi/balancer/pool/composable_stable_pool.go

Large diffs are not rendered by default.

2,826 changes: 2,826 additions & 0 deletions abi/balancer/vault/vault.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion abi/algebra_pool.go → abi/camelot/algebra_pool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion abi/uniswap_pool.go → abi/uniswap/uniswap_pool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 31 additions & 3 deletions client/spot_prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package client
import (
"fmt"

sdkmath "cosmossdk.io/math"

"github.com/ethereum/go-ethereum/common"
"github.com/ojo-network/ethereum-api/abi"
balancerpool "github.com/ojo-network/ethereum-api/abi/balancer/pool"
"github.com/ojo-network/ethereum-api/abi/camelot"
"github.com/ojo-network/ethereum-api/abi/uniswap"
"github.com/ojo-network/ethereum-api/pool"
"github.com/ojo-network/indexer/indexer"
"github.com/ojo-network/indexer/utils"
Expand All @@ -29,6 +33,10 @@ func (c *Client) PollSpotPrices(pools []pool.Pool) {
spotPrice = c.QueryAlgebraSpotPrice(p, blockNum)
c.logger.Info().Interface("spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
case pool.PoolBalancer:
spotPrice = c.QueryBalancerSpotPrice(p, blockNum)
c.logger.Info().Interface("spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
}
}
}
Expand All @@ -38,7 +46,7 @@ func (c *Client) PollSpotPrices(pools []pool.Pool) {

// QueryUniswapSpotPrice queries the spot price of a uniswap pool
func (c *Client) QueryUniswapSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
poolCaller, err := abi.NewPoolCaller(common.HexToAddress(p.Address), c.ethClient)
poolCaller, err := uniswap.NewPoolCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
Expand All @@ -58,7 +66,7 @@ func (c *Client) QueryUniswapSpotPrice(p pool.Pool, blockNum uint64) indexer.Spo

// QueryAlgebraSpotPrice queries the spot price of an alegbra pool
func (c *Client) QueryAlgebraSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
poolCaller, err := abi.NewAlgebraPoolCaller(common.HexToAddress(p.Address), c.ethClient)
poolCaller, err := camelot.NewAlgebraPoolCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
Expand All @@ -75,3 +83,23 @@ func (c *Client) QueryAlgebraSpotPrice(p pool.Pool, blockNum uint64) indexer.Spo
Price: p.SqrtPriceX96ToDec(globalState.Price),
}
}

// QueryBalancerSpotPrice queries the spot price of a balancer pool
func (c *Client) QueryBalancerSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
poolCaller, err := balancerpool.NewPoolCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
}
poolRate, err := poolCaller.GetRate(nil)
if err != nil {
c.reportError(fmt.Errorf("error getting %s pool balance: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
}
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(blockNum),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: sdkmath.LegacyNewDecFromBigIntWithPrec(poolRate, 18),
}
}
94 changes: 87 additions & 7 deletions client/swaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ojo-network/ethereum-api/abi"
balancerpool "github.com/ojo-network/ethereum-api/abi/balancer/pool"
"github.com/ojo-network/ethereum-api/abi/balancer/vault"
"github.com/ojo-network/ethereum-api/abi/camelot"
"github.com/ojo-network/ethereum-api/abi/uniswap"
"github.com/ojo-network/ethereum-api/pool"
)

Expand All @@ -28,6 +31,11 @@ func (c *Client) WatchSwapsAndRestart(p pool.Pool) {
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
case pool.PoolBalancer:
err := c.WatchBalancerSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
}
}
}
Expand All @@ -36,12 +44,12 @@ func (c *Client) WatchSwapsAndRestart(p pool.Pool) {

// WatchUniswapSwapEvent watches for swap events on a uniswap pool
func (c *Client) WatchUniswapSwapEvent(p pool.Pool) error {
poolFilterer, err := abi.NewPoolFilterer(common.HexToAddress(p.Address), c.ethClient)
poolFilterer, err := uniswap.NewPoolFilterer(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
}

eventSink := make(chan *abi.PoolSwap)
eventSink := make(chan *uniswap.PoolSwap)
opts := &bind.WatchOpts{Start: nil, Context: c.ctx}
c.logger.Info().Msgf("subscribing to %s swap events", p.ExchangePair())
subscription, err := poolFilterer.WatchSwap(opts, eventSink, nil, nil)
Expand All @@ -58,8 +66,8 @@ func (c *Client) WatchUniswapSwapEvent(p pool.Pool) error {
case err := <-subscription.Err():
return err
case event := <-eventSink:
swap := p.ConvertEventToSwap(event)
spotPrice := p.ConvertEventToSpotPrice(event)
swap := p.ConvertUniswapEventToSwap(event)
spotPrice := p.ConvertUniswapEventToSpotPrice(event)
c.logger.Info().Interface("uniswap swap", swap).Msg("uniswap swap event received")
c.indexer.AddSwap(swap)
c.indexer.AddPrice(spotPrice)
Expand All @@ -69,12 +77,12 @@ func (c *Client) WatchUniswapSwapEvent(p pool.Pool) error {

// WatchAlgebraSwapEvent watches for swap events on an alegbra pool
func (c *Client) WatchAlgebraSwapEvent(p pool.Pool) error {
poolFilterer, err := abi.NewAlgebraPoolFilterer(common.HexToAddress(p.Address), c.ethClient)
poolFilterer, err := camelot.NewAlgebraPoolFilterer(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
}

eventSink := make(chan *abi.AlgebraPoolSwap)
eventSink := make(chan *camelot.AlgebraPoolSwap)
opts := &bind.WatchOpts{Start: nil, Context: c.ctx}
c.logger.Info().Msgf("subscribing to %s swap events", p.ExchangePair())
subscription, err := poolFilterer.WatchSwap(opts, eventSink, nil, nil)
Expand All @@ -99,3 +107,75 @@ func (c *Client) WatchAlgebraSwapEvent(p pool.Pool) error {
}
}
}

// WatchBalancerSwapEvent watches for swap events on an balancer vault
func (c *Client) WatchBalancerSwapEvent(p pool.Pool) error {
// retreive vault address that supports this pool and pool id
poolCaller, err := balancerpool.NewPoolCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
}

vaultAddress, err := poolCaller.GetVault(nil)
if err != nil {
return err
}
poolId, err := poolCaller.GetPoolId(nil)
if err != nil {
return err
}

// retreive token addresses of pool
vaultCaller, err := vault.NewPoolCaller(vaultAddress, c.ethClient)
if err != nil {
return err
}
poolTokens, err := vaultCaller.GetPoolTokens(nil, poolId)
if err != nil {
return err
}

// build parameters for swap event subscription
poolIdParam := make([][32]byte, 1)
poolIdParam[0] = poolId
tokenInParam := make([]common.Address, 1)
tokenInParam[0] = poolTokens.Tokens[0]
tokenOutParam := make([]common.Address, 1)
tokenOutParam[0] = poolTokens.Tokens[1]

vaultFilterer, err := vault.NewPoolFilterer(vaultAddress, c.ethClient)
if err != nil {
return err
}

eventSink := make(chan *vault.PoolSwap)
opts := &bind.WatchOpts{Start: nil, Context: c.ctx}
c.logger.Info().Msgf("subscribing to %s swap events", p.ExchangePair())
subscription, err := vaultFilterer.WatchSwap(opts, eventSink, poolIdParam, tokenInParam, tokenOutParam)
if err != nil {
return err
}

for {
select {
case <-c.ctx.Done():
c.logger.Info().Msgf("unsubscribing from %s swap events", p.ExchangePair())
subscription.Unsubscribe()
return nil
case err := <-subscription.Err():
return err
case event := <-eventSink:
// query rate from pool contract
poolRate, err := poolCaller.GetRate(nil)
if err != nil {
return err
}

swap := p.ConvertBalancerEventToSwap(event, poolRate)
spotPrice := p.ConvertBalancerEventToSpotPrice(event, poolRate)
c.logger.Info().Interface("balancer swap", swap).Msg("algebra swap event received")
c.indexer.AddSwap(swap)
c.indexer.AddPrice(spotPrice)
}
}
}
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ParseConfig(filePath string) (*Config, error) {

// Validate returns an error if the Config object is invalid.
func (c Config) Validate() (err error) {
for _, exchange := range c.Exchanges{
for _, exchange := range c.Exchanges {
if _, ok := pool.SupportedExchanges[exchange.Name]; !ok {
return fmt.Errorf("unsupported exchange: %s", exchange.Name)
}
Expand Down
1 change: 0 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ server:
assert.Equal(t, "http://localhost:8080", config.Server.ListenAddr)
}


func TestInvalidExchanges(t *testing.T) {
tmpfile, err := os.CreateTemp("", "config_test")
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func main() {
client.MaintainConnection(exchange, i, ctx, logger)
}


// Create new websocket and REST server
s, err := server.NewServer(logger, cfg.Server, cfg.AssetPairs())
if err != nil {
Expand Down
59 changes: 45 additions & 14 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@ import (

sdkmath "cosmossdk.io/math"

"github.com/ojo-network/ethereum-api/abi"
"github.com/ojo-network/ethereum-api/abi/balancer/vault"
"github.com/ojo-network/ethereum-api/abi/camelot"
"github.com/ojo-network/ethereum-api/abi/uniswap"
"github.com/ojo-network/indexer/indexer"
"github.com/ojo-network/indexer/utils"
)

type Pool struct {
Address string `yaml:"address"`
Base string `yaml:"base"`
Quote string `yaml:"quote"`
BaseDecimal uint64 `yaml:"base_decimal"`
QuoteDecimal uint64 `yaml:"quote_decimal"`
InvertPrice bool `yaml:"invert_price"`
Address string `yaml:"address"`
Base string `yaml:"base"`
Quote string `yaml:"quote"`
BaseDecimal uint64 `yaml:"base_decimal"`
QuoteDecimal uint64 `yaml:"quote_decimal"`
InvertPrice bool `yaml:"invert_price"`
}

func (p *Pool) ExchangePair() string {
return p.Base + "/" + p.Quote
}

func (p *Pool) ConvertEventToSpotPrice(event *abi.PoolSwap) indexer.SpotPrice {
func (p *Pool) ConvertUniswapEventToSpotPrice(event *uniswap.PoolSwap) indexer.SpotPrice {
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
Expand All @@ -32,17 +34,17 @@ func (p *Pool) ConvertEventToSpotPrice(event *abi.PoolSwap) indexer.SpotPrice {
}
}

func (p *Pool) ConvertEventToSwap(event *abi.PoolSwap) indexer.Swap {
func (p *Pool) ConvertUniswapEventToSwap(event *uniswap.PoolSwap) indexer.Swap {
return indexer.Swap{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: p.SqrtPriceX96ToDec(event.SqrtPriceX96),
Volume: p.swapVolume(event),
Volume: p.swapUniswapVolume(event),
}
}

func (p *Pool) ConvertAlgebraEventToSpotPrice(event *abi.AlgebraPoolSwap) indexer.SpotPrice {
func (p *Pool) ConvertAlgebraEventToSpotPrice(event *camelot.AlgebraPoolSwap) indexer.SpotPrice {
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
Expand All @@ -51,7 +53,7 @@ func (p *Pool) ConvertAlgebraEventToSpotPrice(event *abi.AlgebraPoolSwap) indexe
}
}

func (p *Pool) ConvertAlgebraEventToSwap(event *abi.AlgebraPoolSwap) indexer.Swap {
func (p *Pool) ConvertAlgebraEventToSwap(event *camelot.AlgebraPoolSwap) indexer.Swap {
return indexer.Swap{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
Expand All @@ -61,6 +63,25 @@ func (p *Pool) ConvertAlgebraEventToSwap(event *abi.AlgebraPoolSwap) indexer.Swa
}
}

func (p *Pool) ConvertBalancerEventToSpotPrice(event *vault.PoolSwap, price *big.Int) indexer.SpotPrice {
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: sdkmath.LegacyNewDecFromBigIntWithPrec(price, 18),
}
}

func (p *Pool) ConvertBalancerEventToSwap(event *vault.PoolSwap, price *big.Int) indexer.Swap {
return indexer.Swap{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: p.SqrtPriceX96ToDec(price),
Volume: p.swapBalancerVolume(event),
}
}

func (p *Pool) SqrtPriceX96ToDec(sqrtPriceX96 *big.Int) sdkmath.LegacyDec {
sdkValue := sdkmath.LegacyNewDecFromBigInt(sqrtPriceX96)

Expand All @@ -86,7 +107,7 @@ func (p *Pool) SqrtPriceX96ToDec(sqrtPriceX96 *big.Int) sdkmath.LegacyDec {
}
}

func (p *Pool) swapVolume(event *abi.PoolSwap) sdkmath.LegacyDec {
func (p *Pool) swapUniswapVolume(event *uniswap.PoolSwap) sdkmath.LegacyDec {
if p.InvertPrice {
volume := sdkmath.LegacyNewDecFromBigInt(event.Amount1).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.QuoteDecimal)))
Expand All @@ -96,7 +117,7 @@ func (p *Pool) swapVolume(event *abi.PoolSwap) sdkmath.LegacyDec {
}
}

func (p *Pool) swapAlgebraVolume(event *abi.AlgebraPoolSwap) sdkmath.LegacyDec {
func (p *Pool) swapAlgebraVolume(event *camelot.AlgebraPoolSwap) sdkmath.LegacyDec {
if p.InvertPrice {
volume := sdkmath.LegacyNewDecFromBigInt(event.Amount1).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.QuoteDecimal)))
Expand All @@ -105,3 +126,13 @@ func (p *Pool) swapAlgebraVolume(event *abi.AlgebraPoolSwap) sdkmath.LegacyDec {
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.BaseDecimal)))
}
}

func (p *Pool) swapBalancerVolume(event *vault.PoolSwap) sdkmath.LegacyDec {
if p.InvertPrice {
volume := sdkmath.LegacyNewDecFromBigInt(event.AmountOut).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.QuoteDecimal)))
} else {
volume := sdkmath.LegacyNewDecFromBigInt(event.AmountIn).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.BaseDecimal)))
}
}
11 changes: 7 additions & 4 deletions pool/supported_exchanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ type PoolContract string

const ExchangeUniswap ExchangeName = "uniswap"
const ExchangeCamelot ExchangeName = "camelot"
const PoolUniswap PoolContract = "uniswappool"
const PoolAlgebra PoolContract = "algebrapool"
const ExchangeBalancer ExchangeName = "balancer"
const PoolUniswap PoolContract = "uniswappool"
const PoolAlgebra PoolContract = "algebrapool"
const PoolBalancer PoolContract = "balancerpool"

// maps exchange to pool contract of that exchange
var SupportedExchanges = map[ExchangeName]PoolContract{
ExchangeUniswap: PoolUniswap,
ExchangeCamelot: PoolAlgebra,
ExchangeUniswap: PoolUniswap,
ExchangeCamelot: PoolAlgebra,
ExchangeBalancer: PoolBalancer,
}

0 comments on commit 8ef31ab

Please sign in to comment.