Skip to content

Commit

Permalink
feat: Camelot indexer for MILKTIA/TIA pair (#16)
Browse files Browse the repository at this point in the history
* feat: Camelot indexer for MILKTIA/TIA pair

* workflows

* allow multiple exchanges in same program

* revert workflows

* update arbitrum ws url
  • Loading branch information
rbajollari authored May 24, 2024
1 parent 01b6474 commit 235c9da
Show file tree
Hide file tree
Showing 12 changed files with 3,121 additions and 115 deletions.
2,804 changes: 2,804 additions & 0 deletions abi/algebra_pool.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion abi/uniswap_pool.json
Original file line number Diff line number Diff line change
Expand Up @@ -985,4 +985,4 @@
"stateMutability": "view",
"type": "function"
}
]
]
9 changes: 6 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
const maxErrorsPerMinute = 10

type Client struct {
nodeUrl string
ethClient *ethclient.Client
indexer *indexer.Indexer
nodeUrl string
poolContract pool.PoolContract
ethClient *ethclient.Client
indexer *indexer.Indexer

ctx context.Context
cancelFunc context.CancelFunc
Expand All @@ -31,6 +32,7 @@ type Client struct {

func NewClient(
nodeUrl string,
poolContract pool.PoolContract,
indexer *indexer.Indexer,
mainCtx context.Context,
logger zerolog.Logger,
Expand All @@ -46,6 +48,7 @@ func NewClient(

return &Client{
nodeUrl: nodeUrl,
poolContract: poolContract,
ethClient: ethClient,
indexer: indexer,
ctx: ctx,
Expand Down
17 changes: 12 additions & 5 deletions client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/ojo-network/ethereum-api/config"
"github.com/ojo-network/ethereum-api/pool"
"github.com/ojo-network/indexer/indexer"
"github.com/rs/zerolog"
)
Expand All @@ -15,7 +16,7 @@ const sleepDurationAfterAllNodesFail = 2 * time.Minute
// in the order provided until a connection is established. It continues this process
// after receiving an error from the client until the context is cancelled.
func MaintainConnection(
cfg *config.Config,
exchange config.Exchange,
indexer *indexer.Indexer,
ctx context.Context,
logger zerolog.Logger,
Expand All @@ -27,15 +28,21 @@ func MaintainConnection(
case <-ctx.Done():
return
default:
if nodeIndex > len(cfg.NodeUrls)-1 {
if nodeIndex > len(exchange.NodeUrls)-1 {
time.Sleep(sleepDurationAfterAllNodesFail)
nodeIndex = 0
}
ethClient, err := NewClient(cfg.NodeUrls[nodeIndex], indexer, ctx, logger)
ethClient, err := NewClient(
exchange.NodeUrls[nodeIndex],
pool.SupportedExchanges[exchange.Name],
indexer,
ctx,
logger,
)
if err != nil {
logger.Error().Err(err).Msgf("error connecting to ethereum node %s", cfg.NodeUrls[nodeIndex])
logger.Error().Err(err).Msgf("error connecting to ethereum node %s", exchange.NodeUrls[nodeIndex])
} else {
ethClient.WatchSwapsAndPollPrices(cfg.Pools)
ethClient.WatchSwapsAndPollPrices(exchange.Pools)
}
nodeIndex++
}
Expand Down
38 changes: 33 additions & 5 deletions client/spot_prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@ func (c *Client) PollSpotPrices(pools []pool.Pool) {
return
case blockNum := <-c.newBlock:
for _, p := range pools {
spotPrice := c.QuerySpotPrice(p, blockNum)
c.logger.Info().Interface("spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
var spotPrice indexer.SpotPrice
switch c.poolContract {
case pool.PoolUniswap:
spotPrice = c.QueryUniswapSpotPrice(p, blockNum)
c.logger.Info().Interface("spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
case pool.PoolAlgebra:
spotPrice = c.QueryAlgebraSpotPrice(p, blockNum)
c.logger.Info().Interface("spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
}
}
}
}
}()
}

// QuerySpotPrice queries the spot price of a pool
func (c *Client) QuerySpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
// QueryUniswapSpotPrice queries the spot price of a uniswap pool
func (c *Client) QueryUniswapSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
poolCaller, err := abi.NewPoolCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
Expand All @@ -47,3 +55,23 @@ func (c *Client) QuerySpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice
Price: p.SqrtPriceX96ToDec(slot0.SqrtPriceX96),
}
}

// QueryAlgebraSpotPrice queries the spot price of an alegbra pool
func (c *Client) QueryAlgebraSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
poolCaller, err := abi.NewAlgebraPoolCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
}
globalState, err := poolCaller.GlobalState(nil)
if err != nil {
c.reportError(fmt.Errorf("error getting %s pool balance: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
}
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(blockNum),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: p.SqrtPriceX96ToDec(globalState.Price),
}
}
53 changes: 47 additions & 6 deletions client/swaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@ func (c *Client) WatchSwapsAndRestart(p pool.Pool) {
case <-c.ctx.Done():
return
default:
err := c.WatchSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
switch c.poolContract {
case pool.PoolUniswap:
err := c.WatchUniswapSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
case pool.PoolAlgebra:
err := c.WatchAlgebraSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
}
}
}
}()
}

// WatchSwapEvent watches for swap events on a pool
func (c *Client) WatchSwapEvent(p pool.Pool) error {
// WatchUniswapSwapEvent watches for swap events on a uniswap pool
func (c *Client) WatchUniswapSwapEvent(p pool.Pool) error {
poolFilterer, err := abi.NewPoolFilterer(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
Expand All @@ -52,7 +60,40 @@ func (c *Client) WatchSwapEvent(p pool.Pool) error {
case event := <-eventSink:
swap := p.ConvertEventToSwap(event)
spotPrice := p.ConvertEventToSpotPrice(event)
c.logger.Info().Interface("swap", swap).Msg("swap event received")
c.logger.Info().Interface("uniswap swap", swap).Msg("uniswap swap event received")
c.indexer.AddSwap(swap)
c.indexer.AddPrice(spotPrice)
}
}
}

// WatchAlgebraSwapEvent watches for swap events on an alegbra pool
func (c *Client) WatchAlgebraSwapEvent(p pool.Pool) error {
poolFilterer, err := abi.NewAlgebraPoolFilterer(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
}

eventSink := make(chan *abi.AlgebraPoolSwap)
opts := &bind.WatchOpts{Start: nil, Context: c.ctx}
c.logger.Info().Msgf("subscribing to %s swap events", p.ExchangePair())
subscription, err := poolFilterer.WatchSwap(opts, eventSink, nil, nil)
if err != nil {
return err
}

for {
select {
case <-c.ctx.Done():
c.logger.Info().Msgf("unsubscribing from %s swap events", p.ExchangePair())
subscription.Unsubscribe()
return nil
case err := <-subscription.Err():
return err
case event := <-eventSink:
swap := p.ConvertAlgebraEventToSwap(event)
spotPrice := p.ConvertAlgebraEventToSpotPrice(event)
c.logger.Info().Interface("alegbra swap", swap).Msg("algebra swap event received")
c.indexer.AddSwap(swap)
c.indexer.AddPrice(spotPrice)
}
Expand Down
44 changes: 29 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package config

import (
"math/rand"
"fmt"
"os"

"github.com/ojo-network/ethereum-api/pool"
"github.com/ojo-network/indexer/server"
"gopkg.in/yaml.v3"
)

type Exchange struct {
Name pool.ExchangeName `yaml:"name"`
NodeUrls []string `yaml:"node_urls"`
Pools []pool.Pool `yaml:"pools"`
}

type Config struct {
NodeUrls []string `yaml:"node_urls"`
Server server.ServerConfig `yaml:"server"`
Pools []pool.Pool `yaml:"pools"`
Exchanges []Exchange `yaml:"exchanges"`
Server server.ServerConfig `yaml:"server"`
}

func ParseConfig(filePath string) (*Config, error) {
Expand All @@ -27,21 +32,30 @@ func ParseConfig(filePath string) (*Config, error) {
if err != nil {
return nil, err
}
return &config, nil
return &config, config.Validate()
}

// Validate returns an error if the Config object is invalid.
func (c Config) Validate() (err error) {
for _, exchange := range c.Exchanges{
if _, ok := pool.SupportedExchanges[exchange.Name]; !ok {
return fmt.Errorf("unsupported exchange: %s", exchange.Name)
}
}

return nil
}

func (c *Config) AssetPairs() []server.AssetPair {
var assetPairs []server.AssetPair
for _, pool := range c.Pools {
assetPairs = append(assetPairs, server.AssetPair{
Base: pool.Base,
Quote: pool.Quote,
})
for _, exchange := range c.Exchanges {
for _, pool := range exchange.Pools {
assetPairs = append(assetPairs, server.AssetPair{
Base: pool.Base,
Quote: pool.Quote,
})
}
}
return assetPairs
}

func (c *Config) RandomNodeUrl() string {
index := rand.Intn(len(c.NodeUrls))
return c.NodeUrls[index]
return assetPairs
}
88 changes: 69 additions & 19 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"testing"

"github.com/ojo-network/ethereum-api/pool"
"github.com/stretchr/testify/assert"
)

Expand All @@ -15,20 +16,23 @@ func TestParseConfig(t *testing.T) {
defer os.Remove(tmpfile.Name())

_, err = tmpfile.Write([]byte(`
---
node_urls: [
http://node1.com,
http://node2.com
]
exchanges:
- name: uniswap
node_urls:
- http://node1.com
pools:
- base: "WBTC"
quote: "WETH"
address: "testAddress1"
- name: camelot
node_urls:
- http://node2.com
pools:
- base: "WETH"
quote: "USDC"
address: "testAddress2"
server:
listen_addr: "http://localhost:8080"
pools:
- base: "WBTC"
quote: "WETH"
address: "testAddress1"
- base: "WETH"
quote: "USDC"
address: "testAddress2"
`))
if err != nil {
t.Fatal(err)
Expand All @@ -44,13 +48,59 @@ pools:
}

// Assert that the parsed config matches the expected values
assert.Equal(t, "http://node1.com", config.NodeUrls[0])
assert.Equal(t, "http://node2.com", config.NodeUrls[1])
assert.Equal(t, pool.ExchangeUniswap, config.Exchanges[0].Name)
assert.Equal(t, pool.ExchangeCamelot, config.Exchanges[1].Name)

assert.Equal(t, "http://node1.com", config.Exchanges[0].NodeUrls[0])
assert.Equal(t, "http://node2.com", config.Exchanges[1].NodeUrls[0])

assert.Equal(t, 1, len(config.Exchanges[0].Pools))
assert.Equal(t, 1, len(config.Exchanges[1].Pools))

assert.Equal(t, "WBTC/WETH", config.Exchanges[0].Pools[0].ExchangePair())
assert.Equal(t, "testAddress1", config.Exchanges[0].Pools[0].Address)

assert.Equal(t, "WETH/USDC", config.Exchanges[1].Pools[0].ExchangePair())
assert.Equal(t, "testAddress2", config.Exchanges[1].Pools[0].Address)

assert.Equal(t, "http://localhost:8080", config.Server.ListenAddr)
assert.Equal(t, 2, len(config.Pools))
assert.Equal(t, "WBTC/WETH", config.Pools[0].ExchangePair())
assert.Equal(t, "testAddress1", config.Pools[0].Address)
assert.Equal(t, "WETH/USDC", config.Pools[1].ExchangePair())
assert.Equal(t, "testAddress2", config.Pools[1].Address)
}


func TestInvalidExchanges(t *testing.T) {
tmpfile, err := os.CreateTemp("", "config_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())

_, err = tmpfile.Write([]byte(`
exchanges:
- name: invalid exchange
node_urls:
- http://node1.com
pools:
- base: "WBTC"
quote: "WETH"
address: "testAddress1"
- name: camelot
node_urls:
- http://node2.com
pools:
- base: "WETH"
quote: "USDC"
address: "testAddress2"
server:
listen_addr: "http://localhost:8080"
`))
if err != nil {
t.Fatal(err)
}

if err := tmpfile.Close(); err != nil {
t.Fatal(err)
}

_, err = ParseConfig(tmpfile.Name())
assert.ErrorContains(t, err, "unsupported exchange: invalid exchange")
}
Loading

0 comments on commit 235c9da

Please sign in to comment.