Skip to content

Commit

Permalink
feat: Separate curve package into StableSwapNG and TwoCrytoOptimized …
Browse files Browse the repository at this point in the history
…pool types (#31)

* feat: Separate curve package into StableSwapNG and TwoCrytoOptimized pools

* pool types for curve exchange

* fix tco price
  • Loading branch information
rbajollari authored Sep 5, 2024
1 parent e45354e commit 577eaea
Show file tree
Hide file tree
Showing 9 changed files with 4,436 additions and 694 deletions.
1,346 changes: 673 additions & 673 deletions abi/curve/curve.go → abi/curve/stableswapng.go

Large diffs are not rendered by default.

3,517 changes: 3,517 additions & 0 deletions abi/curve/twocryptooptimized.go

Large diffs are not rendered by default.

43 changes: 37 additions & 6 deletions client/spot_prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@ func (c *Client) PollSpotPrices(pools []pool.Pool) {
c.logger.Info().Interface("Pancake spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
case pool.PoolCurve:
spotPrice = c.QueryCurveSpotPrice(p, blockNum)
c.logger.Info().Interface("Curve spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
if p.PoolType == pool.StableSwapNG {
spotPrice = c.QueryCurveStableSwapNGSpotPrice(p, blockNum)
c.logger.Info().Interface("Curve stableswapng spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
} else if p.PoolType == pool.TwocryptoOptimized {
spotPrice = c.QueryCurveTwoCryptoOptimizedSpotPrice(p, blockNum)
c.logger.Info().Interface("Curve twocryptooptimized spotPrice", spotPrice).Msg("spot price received")
c.indexer.AddPrice(spotPrice)
}
}
}
}
Expand Down Expand Up @@ -136,9 +142,9 @@ func (c *Client) QueryPancakeSpotPrice(p pool.Pool, blockNum uint64) indexer.Spo
}
}

// QueryCurveSpotPrice queries the spot price of a curve pool
func (c *Client) QueryCurveSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
curveCaller, err := curve.NewCurveCaller(common.HexToAddress(p.Address), c.ethClient)
// QueryCurveStableSwapNGSpotPrice queries the spot price of a curve stableswapng pool
func (c *Client) QueryCurveStableSwapNGSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
curveCaller, err := curve.NewStableSwapNGCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
Expand All @@ -160,3 +166,28 @@ func (c *Client) QueryCurveSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotP
Price: sdkmath.LegacyNewDecFromBigIntWithPrec(poolPrice, 18),
}
}

// QueryCurveTwoCryptoOptimizedSpotPrice queries the spot price of a curve twocryptooptimized pool
func (c *Client) QueryCurveTwoCryptoOptimizedSpotPrice(p pool.Pool, blockNum uint64) indexer.SpotPrice {
curveCaller, err := curve.NewTwocryptoOptimizedCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
c.reportError(fmt.Errorf("error initializing %s pool caller: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
}

// price comes inverted
poolPriceInverted, err := curveCaller.LastPrices(nil)
if err != nil {
c.reportError(fmt.Errorf("error getting %s token last price from pool: %w", p.ExchangePair(), err))
return indexer.SpotPrice{}
}
scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(36), nil)
poolPrice := new(big.Int).Quo(scale, poolPriceInverted)

return indexer.SpotPrice{
BlockNum: indexer.BlockNum(blockNum),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: sdkmath.LegacyNewDecFromBigIntWithPrec(poolPrice, 18),
}
}
83 changes: 72 additions & 11 deletions client/swaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ func (c *Client) WatchSwapsAndRestart(p pool.Pool) {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
case pool.PoolCurve:
err := c.WatchCurveSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
if p.PoolType == pool.StableSwapNG {
err := c.WatchCurveStableSwapNGSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
} else if p.PoolType == pool.TwocryptoOptimized {
err := c.WatchCurveTwoCryptoOptimizedSwapEvent(p)
if err != nil {
c.reportError(fmt.Errorf("error watching %s swap events", p.ExchangePair()))
}
}
}
}
Expand Down Expand Up @@ -218,19 +225,19 @@ func (c *Client) WatchPancakeSwapEvent(p pool.Pool) error {
}
}

// WatchCurveSwapEvent watches for swap events on a curve pool
func (c *Client) WatchCurveSwapEvent(p pool.Pool) error {
curveCaller, err := curve.NewCurveCaller(common.HexToAddress(p.Address), c.ethClient)
// WatchCurveStableSwapNGSwapEvent watches for swap events on a curve StableSwapNG pool
func (c *Client) WatchCurveStableSwapNGSwapEvent(p pool.Pool) error {
curveCaller, err := curve.NewStableSwapNGCaller(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
}

curveFilterer, err := curve.NewCurveFilterer(common.HexToAddress(p.Address), c.ethClient)
curveFilterer, err := curve.NewStableSwapNGFilterer(common.HexToAddress(p.Address), c.ethClient)
if err != nil {
return err
}

eventSink := make(chan *curve.CurveTokenExchange)
eventSink := make(chan *curve.StableSwapNGTokenExchange)
opts := &bind.WatchOpts{Start: nil, Context: c.ctx}
c.logger.Info().Msgf("subscribing to %s swap events", p.ExchangePair())
subscription, err := curveFilterer.WatchTokenExchange(opts, eventSink, nil)
Expand All @@ -255,9 +262,63 @@ func (c *Client) WatchCurveSwapEvent(p pool.Pool) error {
scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(36), nil)
poolPrice := new(big.Int).Quo(scale, poolPriceInverted)

swap := p.ConvertCurveEventToSwap(event, poolPrice)
spotPrice := p.ConvertCurveEventToSpotPrice(event, poolPrice)
c.logger.Info().Interface("curve swap", swap).Msg("curve swap event received")
swap := p.ConvertCurveStableSwapNGEventToSwap(event, poolPrice)
spotPrice := p.ConvertCurveStableSwapNGEventToSpotPrice(event, poolPrice)
c.logger.Info().Interface("curve stableswapng swap", swap).
Msg("curve stableswapng swap event received")
c.indexer.AddSwap(swap)
c.indexer.AddPrice(spotPrice)
}
}
}

// WatchCurveTwoCryptoOptimizedSwapEvent watches for swap events on a curve TwoCryptoOptimized pool
func (c *Client) WatchCurveTwoCryptoOptimizedSwapEvent(p pool.Pool) error {
curveCaller, err := curve.NewTwocryptoOptimizedCaller(
common.HexToAddress(p.Address),
c.ethClient,
)
if err != nil {
return err
}

curveFilterer, err := curve.NewTwocryptoOptimizedFilterer(
common.HexToAddress(p.Address),
c.ethClient,
)
if err != nil {
return err
}

eventSink := make(chan *curve.TwocryptoOptimizedTokenExchange)
opts := &bind.WatchOpts{Start: nil, Context: c.ctx}
c.logger.Info().Msgf("subscribing to %s swap events", p.ExchangePair())
subscription, err := curveFilterer.WatchTokenExchange(opts, eventSink, nil)
if err != nil {
return err
}

for {
select {
case <-c.ctx.Done():
c.logger.Info().Msgf("unsubscribing from %s swap events", p.ExchangePair())
subscription.Unsubscribe()
return nil
case err := <-subscription.Err():
return err
case event := <-eventSink:
// price comes inverted
poolPriceInverted, err := curveCaller.LastPrices(nil)
if err != nil {
return err
}
scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(36), nil)
poolPrice := new(big.Int).Quo(scale, poolPriceInverted)

swap := p.ConvertCurveTwoCryptoOptimizedEventToSwap(event, poolPrice)
spotPrice := p.ConvertCurveTwoCryptoOptimizedEventToSpotPrice(event, poolPrice)
c.logger.Info().Interface("curve twocryptooptimized swap", swap).
Msg("curve twocryptooptimized swap event received")
c.indexer.AddSwap(swap)
c.indexer.AddPrice(spotPrice)
}
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ func (c Config) Validate() (err error) {
if _, ok := pool.SupportedExchanges[exchange.Name]; !ok {
return fmt.Errorf("unsupported exchange: %s", exchange.Name)
}

// validate pool types of curve pools
if exchange.Name == pool.ExchangeCurve {
for _, p := range exchange.Pools {
if p.PoolType != pool.StableSwapNG && p.PoolType != pool.TwocryptoOptimized {
return fmt.Errorf("unsupported pool type for curve exchange: %s", p.PoolType)
}
}
}
}

return nil
Expand Down
65 changes: 65 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,68 @@ exchanges:
_, err = ParseConfig(tmpfile.Name())
assert.ErrorContains(t, err, "unsupported exchange: invalid exchange")
}

func TestValidPoolTypes(t *testing.T) {
tmpfile, err := os.CreateTemp("", "config_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())

_, err = tmpfile.Write([]byte(`
exchanges:
- name: curve
node_urls:
- http://node1.com
pools:
- base: "WBTC"
quote: "WETH"
address: "testAddress1"
pool_type: "stableswapng"
- base: "WETH"
quote: "USDC"
address: "testAddress2"
pool_type: "twocryptooptimized"
server:
listen_addr: "http://localhost:8080"
`))
if err != nil {
t.Fatal(err)
}

if err := tmpfile.Close(); err != nil {
t.Fatal(err)
}
}

func TestInvalidPoolType(t *testing.T) {
tmpfile, err := os.CreateTemp("", "config_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())

_, err = tmpfile.Write([]byte(`
exchanges:
- name: curve
node_urls:
- http://node1.com
pools:
- base: "WBTC"
quote: "WETH"
address: "testAddress1"
pool_type: "invalid pool type"
server:
listen_addr: "http://localhost:8080"
`))
if err != nil {
t.Fatal(err)
}

if err := tmpfile.Close(); err != nil {
t.Fatal(err)
}

_, err = ParseConfig(tmpfile.Name())
assert.ErrorContains(t, err, "unsupported pool type for curve exchange: invalid pool type")
}
56 changes: 52 additions & 4 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Pool struct {
BaseDecimal uint64 `yaml:"base_decimal"`
QuoteDecimal uint64 `yaml:"quote_decimal"`
InvertPrice bool `yaml:"invert_price"`

// used for exchanges with multiple pool implementations
PoolType PoolType `yaml:"pool_type"`
}

func (p *Pool) ExchangePair() string {
Expand Down Expand Up @@ -105,7 +108,35 @@ func (p *Pool) ConvertPancakeEventToSwap(event *pancake.PancakeSwap) indexer.Swa
}
}

func (p *Pool) ConvertCurveEventToSpotPrice(event *curve.CurveTokenExchange, price *big.Int) indexer.SpotPrice {
func (p *Pool) ConvertCurveStableSwapNGEventToSpotPrice(
event *curve.StableSwapNGTokenExchange,
price *big.Int,
) indexer.SpotPrice {
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: p.SqrtPriceX96ToDec(price),
}
}

func (p *Pool) ConvertCurveStableSwapNGEventToSwap(
event *curve.StableSwapNGTokenExchange,
price *big.Int,
) indexer.Swap {
return indexer.Swap{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: sdkmath.LegacyNewDecFromBigIntWithPrec(price, 18),
Volume: p.swapCurveStableSwapNGVolume(event),
}
}

func (p *Pool) ConvertCurveTwoCryptoOptimizedEventToSpotPrice(
event *curve.TwocryptoOptimizedTokenExchange,
price *big.Int,
) indexer.SpotPrice {
return indexer.SpotPrice{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
Expand All @@ -114,13 +145,16 @@ func (p *Pool) ConvertCurveEventToSpotPrice(event *curve.CurveTokenExchange, pri
}
}

func (p *Pool) ConvertCurveEventToSwap(event *curve.CurveTokenExchange, price *big.Int) indexer.Swap {
func (p *Pool) ConvertCurveTwoCryptoOptimizedEventToSwap(
event *curve.TwocryptoOptimizedTokenExchange,
price *big.Int,
) indexer.Swap {
return indexer.Swap{
BlockNum: indexer.BlockNum(event.Raw.BlockNumber),
Timestamp: utils.CurrentUnixTime(),
ExchangePair: p.ExchangePair(),
Price: sdkmath.LegacyNewDecFromBigIntWithPrec(price, 18),
Volume: p.swapCurveVolume(event),
Volume: p.swapCurveTwoCryptoOptimizedVolume(event),
}
}

Expand Down Expand Up @@ -189,7 +223,21 @@ func (p *Pool) swapPancakeVolume(event *pancake.PancakeSwap) sdkmath.LegacyDec {
}
}

func (p *Pool) swapCurveVolume(event *curve.CurveTokenExchange) sdkmath.LegacyDec {
func (p *Pool) swapCurveStableSwapNGVolume(
event *curve.StableSwapNGTokenExchange,
) sdkmath.LegacyDec {
if p.InvertPrice {
volume := sdkmath.LegacyNewDecFromBigInt(event.TokensBought).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.QuoteDecimal)))
} else {
volume := sdkmath.LegacyNewDecFromBigInt(event.TokensSold).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.BaseDecimal)))
}
}

func (p *Pool) swapCurveTwoCryptoOptimizedVolume(
event *curve.TwocryptoOptimizedTokenExchange,
) sdkmath.LegacyDec {
if p.InvertPrice {
volume := sdkmath.LegacyNewDecFromBigInt(event.TokensBought).Abs()
return volume.Quo(sdkmath.LegacyNewDec(10).Power(uint64(p.QuoteDecimal)))
Expand Down
3 changes: 3 additions & 0 deletions pool/supported_exchanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

type ExchangeName string
type PoolContract string
type PoolType string

const ExchangeUniswap ExchangeName = "uniswap"
const ExchangeCamelot ExchangeName = "camelot"
Expand All @@ -13,6 +14,8 @@ const PoolAlgebra PoolContract = "algebrapool"
const PoolBalancer PoolContract = "balancerpool"
const PoolPancake PoolContract = "pancakepool"
const PoolCurve PoolContract = "curvepool"
const StableSwapNG PoolType = "stableswapng"
const TwocryptoOptimized PoolType = "twocryptooptimized"

// maps exchange to pool contract of that exchange
var SupportedExchanges = map[ExchangeName]PoolContract{
Expand Down
8 changes: 8 additions & 0 deletions sample-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ exchanges:
base_decimal: 18
quote_decimal: 18
invert_price: false
pool_type: "stableswapng"
- address: "0x19b8524665abac613d82ece5d8347ba44c714bdd"
base: "YNETH"
quote: "WSTETH"
base_decimal: 18
quote_decimal: 18
invert_price: false
pool_type: "twocryptooptimized"
server:
listen_addr: "0.0.0.0:5009"
write_timeout: "20s"
Expand Down

0 comments on commit 577eaea

Please sign in to comment.