diff --git a/cmd/app/flags/flags.go b/cmd/app/flags/flags.go index fedc80e..93b75a8 100644 --- a/cmd/app/flags/flags.go +++ b/cmd/app/flags/flags.go @@ -7,9 +7,7 @@ import ( const ( NetworkFlagName = "network" - L1RpcUrlFlagName = "l1-rpc" L1WsRpcUrlFlagName = "l1-ws-rpc" - L2RpcUrlFlagName = "l2-rpc" L2WsRpcUrlFlagName = "l2-ws-rpc" L1StandardBridgeFlagName = "l1-standard-bridge-address" L2StandardBridgeFlagName = "l2-standard-bridge-address" @@ -18,8 +16,9 @@ const ( SlackUrlFlagName = "slack-url" L1ExplorerUrlFlagName = "l1-explorer-url" L2ExplorerUrlFlagName = "l2-explorer-url" - OffFlagName = "slack-on-off" - TokenAddressesFlagName = "token-addresses" + L1TokenAddresses = "l1-token-addresses" + L2TokenAddresses = "l2-token-addresses" + RedisAddressFlagName = "redis-address" ) var ( @@ -28,24 +27,12 @@ var ( Usage: "Network name", EnvVars: []string{"NETWORK"}, } - L1RpcFlag = &cli.StringFlag{ - Name: L1RpcUrlFlagName, - Usage: "L1 RPC url", - Value: "http://localhost:8545", - EnvVars: []string{"L1_RPC"}, - } L1WsRpcFlag = &cli.StringFlag{ Name: L1WsRpcUrlFlagName, Usage: "L1 RPC url", Value: "ws://localhost:8546", EnvVars: []string{"L1_WS_RPC"}, } - L2RPCFlag = &cli.StringFlag{ - Name: L2RpcUrlFlagName, - Usage: "L2 RPC url", - Value: "http://localhost:9545", - EnvVars: []string{"L2_RPC"}, - } L2WsRpcFlag = &cli.StringFlag{ Name: L2WsRpcUrlFlagName, Usage: "L2 Ws RPC url", @@ -88,24 +75,28 @@ var ( Usage: "L2 explorer url", EnvVars: []string{"L2_EXPLORER_URL"}, } - OffFlag = &cli.BoolFlag{ - Name: OffFlagName, - Usage: "Slack active", - EnvVars: []string{"OFF"}, + L1TokenAddressesFlag = &cli.StringSliceFlag{ + Name: L1TokenAddresses, + Usage: "List of L1 tokens address to get symbol and decimals", + EnvVars: []string{"L1_TOKEN_ADDRESSES"}, + } + L2TokenAddressesFlag = &cli.StringSliceFlag{ + Name: L2TokenAddresses, + Usage: "List of L2 tokens address to get symbol and decimals", + EnvVars: []string{"L2_TOKEN_ADDRESSES"}, } - TokenAddressesFlag = &cli.StringSliceFlag{ - Name: TokenAddressesFlagName, - Usage: "List of addresses to get symbol and decimals", - EnvVars: []string{"TOKEN_ADDRESSES"}, + RedisAddressFlag = &cli.StringFlag{ + Name: RedisAddressFlagName, + EnvVars: []string{ + "REDIS_ADDRESS", + }, } ) func Flags() []cli.Flag { return []cli.Flag{ NetworkFlag, - L1RpcFlag, L1WsRpcFlag, - L2RPCFlag, L2WsRpcFlag, L1StandardBridgeFlag, L2StandardBridgeFlag, @@ -114,7 +105,8 @@ func Flags() []cli.Flag { SlackUrlFlag, L1ExplorerUrlFlag, L2ExplorerUrlFlag, - OffFlag, - TokenAddressesFlag, + L1TokenAddressesFlag, + L2TokenAddressesFlag, + RedisAddressFlag, } } diff --git a/cmd/app/main.go b/cmd/app/main.go index ab02c1a..63c0bd0 100644 --- a/cmd/app/main.go +++ b/cmd/app/main.go @@ -3,11 +3,12 @@ package main import ( "os" + "github.com/urfave/cli/v2" + "github.com/tokamak-network/tokamak-thanos-event-listener/cmd/app/flags" thanosnotif "github.com/tokamak-network/tokamak-thanos-event-listener/internal/app/thanos-notif" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/redis" "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" - - "github.com/urfave/cli/v2" ) func main() { @@ -24,7 +25,7 @@ func main() { }, } if err := app.Run(os.Args); err != nil { - log.GetLogger().Fatalw("Failed to start the application", "err", err) + log.GetLogger().Fatalw("Failed to start the application", "error", err) } } @@ -33,9 +34,7 @@ func startListener(ctx *cli.Context) error { config := &thanosnotif.Config{ Network: ctx.String(flags.NetworkFlagName), - L1Rpc: ctx.String(flags.L1RpcUrlFlagName), L1WsRpc: ctx.String(flags.L1WsRpcUrlFlagName), - L2Rpc: ctx.String(flags.L2RpcUrlFlagName), L2WsRpc: ctx.String(flags.L2WsRpcUrlFlagName), L1StandardBridge: ctx.String(flags.L1StandardBridgeFlagName), L2StandardBridge: ctx.String(flags.L2StandardBridgeFlagName), @@ -44,13 +43,24 @@ func startListener(ctx *cli.Context) error { SlackURL: ctx.String(flags.SlackUrlFlagName), L1ExplorerUrl: ctx.String(flags.L1ExplorerUrlFlagName), L2ExplorerUrl: ctx.String(flags.L2ExplorerUrlFlagName), - OFF: ctx.Bool(flags.OffFlagName), - TokenAddresses: ctx.StringSlice(flags.TokenAddressesFlagName), + L1TokenAddresses: ctx.StringSlice(flags.L1TokenAddresses), + L2TokenAddresses: ctx.StringSlice(flags.L2TokenAddresses), + RedisConfig: redis.Config{ + Addresses: ctx.String(flags.RedisAddressFlagName), + }, + } + + if err := config.Validate(); err != nil { + log.GetLogger().Fatalw("Failed to start the application", "error", err) } log.GetLogger().Infow("Set up configuration", "config", config) - app := thanosnotif.New(config) + app, err := thanosnotif.New(ctx.Context, config) + if err != nil { + log.GetLogger().Errorw("Failed to start the application", "error", err) + return err + } - return app.Start() + return app.Start(ctx.Context) } diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..f6a09de --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,13 @@ +version: '3.8' + +services: + redis: + image: redis:6 + container_name: redis + ports: + - "6379:6379" + volumes: + - redis_data:/data + +volumes: + redis_data: diff --git a/go.mod b/go.mod index 6b9453e..12477fb 100644 --- a/go.mod +++ b/go.mod @@ -6,21 +6,27 @@ require ( github.com/bits-and-blooms/bloom/v3 v3.7.0 github.com/ethereum-optimism/optimism/op-bindings v0.10.14 github.com/ethereum/go-ethereum v1.14.0 + github.com/go-redis/redis/v8 v8.11.5 + github.com/stretchr/testify v1.8.4 github.com/tokamak-network/tokamak-thanos v0.0.0-20240704090822-2d66a7cf788f github.com/urfave/cli/v2 v2.27.1 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.7.0 ) require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.3 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20240222155908-ab073f6aa74f // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -29,6 +35,7 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect @@ -41,7 +48,6 @@ require ( golang.org/x/crypto v0.22.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/tools v0.21.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect diff --git a/go.sum b/go.sum index 4c0f833..7ce87b3 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/ethereum-optimism/optimism/op-bindings v0.10.14 h1:SMMnMdNb1QIhJDyvk7QMUv+crAP4UHHoSYBOASBDIjM= github.com/ethereum-optimism/optimism/op-bindings v0.10.14/go.mod h1:9ZSUq/rjlzp3uYyBN4sZmhTc3oZgDVqJ4wrUja7vj6c= github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20240222155908-ab073f6aa74f h1:L2ub0d0iW2Nqwh1r9WxMqebgZf7rU+wHuVCv21uAGx8= @@ -69,6 +71,8 @@ github.com/getsentry/sentry-go v0.18.0/go.mod h1:Kgon4Mby+FJ7ZWHFUAZgVaIa8sxHtnR github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -77,6 +81,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8 h1:Ep/joEub9YwcjRY6ND3+Y/w0ncE540RtGatVhtZL0/Q= github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= @@ -122,8 +128,14 @@ github.com/mitchellh/pointerstructure v1.2.1/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8oh github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo= +github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -182,6 +194,8 @@ golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8 golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -203,6 +217,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/app/thanos-notif/app.go b/internal/app/thanos-notif/app.go index 5a1524d..c2d7ec5 100644 --- a/internal/app/thanos-notif/app.go +++ b/internal/app/thanos-notif/app.go @@ -1,14 +1,18 @@ package thanosnotif import ( + "context" "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/tokamak-network/tokamak-thanos/op-bindings/bindings" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/redis" + "golang.org/x/sync/errgroup" + redislib "github.com/go-redis/redis/v8" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/listener" "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/notification" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/repository" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" ) @@ -21,508 +25,154 @@ const ( WithdrawalInitiatedEventABI = "WithdrawalInitiated(address,address,address,address,uint256,bytes)" ) -type Notifier interface { - NotifyWithReTry(title string, text string) - Notify(title string, text string) error - Enable() - Disable() -} - type App struct { - cfg *Config - notifier Notifier - tonAddress string - tokenInfo map[string]TokenInfo + cfg *Config + l1TokensInfo map[string]*types.Token + l2TokensInfo map[string]*types.Token + l1Listener *listener.EventService + l2Listener *listener.EventService + l1Client *bcclient.Client + l2Client *bcclient.Client } -func (app *App) ETHDepEvent(vLog *types.Log) { - log.GetLogger().Infow("Got ETH Deposit Event", "event", vLog) - - l1BridgeFilterer, _, err := app.getBridgeFilterers() +func New(ctx context.Context, cfg *Config) (*App, error) { + redisClient, err := redis.New(ctx, cfg.RedisConfig) if err != nil { - return + log.GetLogger().Errorw("Failed to connect to redis", "error", err) + return nil, err } - event, err := l1BridgeFilterer.ParseETHDepositInitiated(*vLog) - if err != nil { - log.GetLogger().Errorw("ETHDepositInitiated event parsing fail", "error", err) - return - } - - ethDep := bindings.L1StandardBridgeETHDepositInitiated{ - From: event.From, - To: event.To, - Amount: event.Amount, - } - - Amount := app.formatAmount(ethDep.Amount, 18) - - // Slack notify title and text - title := fmt.Sprintf("[" + app.cfg.Network + "] [ETH Deposit Initialized]") - text := fmt.Sprintf("Tx: "+app.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nAmount: %s ETH", vLog.TxHash, ethDep.From, ethDep.To, Amount) - - app.notifier.Notify(title, text) -} - -func (app *App) ETHWithEvent(vLog *types.Log) { - log.GetLogger().Infow("Got ETH Withdrawal Event", "event", vLog) - - l1BridgeFilterer, _, err := app.getBridgeFilterers() + l1Client, err := bcclient.New(ctx, cfg.L1WsRpc) if err != nil { - return + log.GetLogger().Errorw("Failed to create L1 client", "error", err) + return nil, err } - event, err := l1BridgeFilterer.ParseETHWithdrawalFinalized(*vLog) + l2Client, err := bcclient.New(ctx, cfg.L2WsRpc) if err != nil { - log.GetLogger().Errorw("ETHWithdrawalFinalized event log parsing fail", "error", err) - return + log.GetLogger().Errorw("Failed to create L2 client", "error", err) + return nil, err } - ethWith := bindings.L1StandardBridgeETHWithdrawalFinalized{ - From: event.From, - To: event.To, - Amount: event.Amount, - } - - Amount := app.formatAmount(ethWith.Amount, 18) - - // Slack notify title and text - title := fmt.Sprintf("[" + app.cfg.Network + "] [ETH Withdrawal Finalized]") - text := fmt.Sprintf("Tx: "+app.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nAmount: %s ETH", vLog.TxHash, ethWith.From, ethWith.To, Amount) - - app.notifier.Notify(title, text) -} - -func (app *App) ERC20DepEvent(vLog *types.Log) { - log.GetLogger().Infow("Got ERC20 Deposit Event", "event", vLog) - - l1BridgeFilterer, _, err := app.getBridgeFilterers() + l1Tokens, err := fetchTokensInfo(l1Client, cfg.L1TokenAddresses) if err != nil { - return + log.GetLogger().Errorw("Failed to fetch L1 tokens info", "error", err) + return nil, err } - event, err := l1BridgeFilterer.ParseERC20DepositInitiated(*vLog) + l2Tokens, err := fetchTokensInfo(l2Client, cfg.L2TokenAddresses) if err != nil { - log.GetLogger().Errorw("ERC20DepositInitiated event parsing fail", "error", err) - return + log.GetLogger().Errorw("Failed to fetch L2 tokens info", "error", err) + return nil, err } - erc20Dep := bindings.L1StandardBridgeERC20DepositInitiated{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } - - // get symbol and decimals - tokenAddress := erc20Dep.L1Token - tokenInfo, found := app.tokenInfo[tokenAddress.Hex()] - if !found { - log.GetLogger().Errorw("Token info not found for address", "tokenAddress", tokenAddress.Hex()) - return - } - - tokenSymbol := tokenInfo.Symbol - tokenDecimals := tokenInfo.Decimals - - Amount := app.formatAmount(erc20Dep.Amount, tokenDecimals) - - // Slack notify title and text - var title string - - isTON := tokenAddress.Cmp(common.HexToAddress(app.tonAddress)) == 0 - - if isTON { - title = fmt.Sprintf("[" + app.cfg.Network + "] [TON Deposit Initialized]") - } else { - title = fmt.Sprintf("[" + app.cfg.Network + "] [ERC-20 Deposit Initialized]") + app := &App{ + cfg: cfg, + l1TokensInfo: l1Tokens, + l2TokensInfo: l2Tokens, + l1Client: l1Client, + l2Client: l2Client, } - text := fmt.Sprintf("Tx: "+app.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, erc20Dep.From, erc20Dep.To, erc20Dep.L1Token, erc20Dep.L2Token, Amount, tokenSymbol) - app.notifier.Notify(title, text) -} + slackNotifier := notification.MakeSlackNotificationService(cfg.SlackURL, 5) -func (app *App) ERC20WithEvent(vLog *types.Log) { - log.GetLogger().Infow("Got ERC20 Withdrawal Event", "event", vLog) - - l1BridgeFilterer, _, err := app.getBridgeFilterers() + l1Listener, err := app.initL1Listener(ctx, slackNotifier, l1Client, redisClient) if err != nil { - return + log.GetLogger().Errorw("Failed to initialize L1 listener", "error", err) + return nil, err } - event, err := l1BridgeFilterer.ParseERC20WithdrawalFinalized(*vLog) + l2Listener, err := app.initL2Listener(ctx, slackNotifier, l2Client, redisClient) if err != nil { - log.GetLogger().Errorw("ERC20WithdrawalFinalized event parsing fail", "error", err) - return + log.GetLogger().Errorw("Failed to initialize L2 listener", "error", err) + return nil, err } - erc20With := bindings.L1StandardBridgeERC20WithdrawalFinalized{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } - - // get symbol and decimals - tokenAddress := erc20With.L1Token - tokenInfo, found := app.tokenInfo[tokenAddress.Hex()] - if !found { - log.GetLogger().Errorw("Token info not found for address", "tokenAddress", tokenAddress.Hex()) - return - } - - tokenSymbol := tokenInfo.Symbol - tokenDecimals := tokenInfo.Decimals - - Amount := app.formatAmount(erc20With.Amount, tokenDecimals) + app.l1Listener = l1Listener + app.l2Listener = l2Listener - // Slack notify title and text - var title string - - isTON := tokenAddress.Cmp(common.HexToAddress(app.tonAddress)) == 0 - - if isTON { - title = fmt.Sprintf("[" + app.cfg.Network + "] [TON Withdrawal Finalized]") - } else { - title = fmt.Sprintf("[" + app.cfg.Network + "] [ERC-20 Withdrawal Finalized]") - } - text := fmt.Sprintf("Tx: "+app.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, erc20With.From, erc20With.To, erc20With.L1Token, erc20With.L2Token, Amount, tokenSymbol) - - app.notifier.Notify(title, text) + return app, nil } -func (app *App) L2DepEvent(vLog *types.Log) { - log.GetLogger().Infow("Got L2 Deposit Event", "event", vLog) - - _, l2BridgeFilterer, err := app.getBridgeFilterers() - if err != nil { - return - } - - event, err := l2BridgeFilterer.ParseDepositFinalized(*vLog) - if err != nil { - log.GetLogger().Errorw("DepositFinalized event parsing fail", "error", err) - return - } - - l2Dep := bindings.L2StandardBridgeDepositFinalized{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } +func (p *App) Start(ctx context.Context) error { + var g errgroup.Group - // get symbol and decimals - var tokenSymbol string - var tokenDecimals int - - tokenAddress := l2Dep.L2Token - isETH := tokenAddress.Cmp(common.HexToAddress("0x4200000000000000000000000000000000000486")) == 0 - isTON := tokenAddress.Cmp(common.HexToAddress("0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000")) == 0 - - if isETH { - tokenSymbol = "ETH" - tokenDecimals = 18 - } else if isTON { - tokenSymbol = "TON" - tokenDecimals = 18 - } else { - tokenInfo, found := app.tokenInfo[tokenAddress.Hex()] - if !found { - log.GetLogger().Errorw("Token info not found for address", "tokenAddress", tokenAddress.Hex()) - return + g.Go(func() error { + err := p.l1Listener.Start(ctx) + if err != nil { + return err } - tokenSymbol = tokenInfo.Symbol - tokenDecimals = tokenInfo.Decimals - } - - Amount := app.formatAmount(l2Dep.Amount, tokenDecimals) - - var title string - var text string - - if isETH { - title = fmt.Sprintf("[" + app.cfg.Network + "] [ETH Deposit Finalized]") - text = fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nL1Token: ETH\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2Dep.From, l2Dep.To, l2Dep.L2Token, Amount, tokenSymbol) - } else if isTON { - title = fmt.Sprintf("[" + app.cfg.Network + "] [TON Deposit Finalized]") - text = fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2Dep.From, l2Dep.To, app.tonAddress, l2Dep.L2Token, Amount, tokenSymbol) - } else { - title = fmt.Sprintf("[" + app.cfg.Network + "] [ERC-20 Deposit Finalized]") - text = fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2Dep.From, l2Dep.To, l2Dep.L1Token, l2Dep.L2Token, Amount, tokenSymbol) - } - app.notifier.Notify(title, text) -} - -func (app *App) L2WithEvent(vLog *types.Log) { - log.GetLogger().Infow("Got L2 Withdrawal Event", "event", vLog) - - _, l2BridgeFilterer, err := app.getBridgeFilterers() - if err != nil { - return - } - - event, err := l2BridgeFilterer.ParseWithdrawalInitiated(*vLog) - if err != nil { - log.GetLogger().Errorw("WithdrawalInitiated event parsing fail", "error", err) - return - } - - l2With := bindings.L2StandardBridgeWithdrawalInitiated{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } + return nil + }) - // get symbol and decimals - var tokenSymbol string - var tokenDecimals int - - tokenAddress := l2With.L2Token - isETH := tokenAddress.Cmp(common.HexToAddress("0x4200000000000000000000000000000000000486")) == 0 - isTON := tokenAddress.Cmp(common.HexToAddress("0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000")) == 0 - - if isETH { - tokenSymbol = "ETH" - tokenDecimals = 18 - } else if isTON { - tokenSymbol = "TON" - tokenDecimals = 18 - } else { - tokenInfo, found := app.tokenInfo[tokenAddress.Hex()] - if !found { - log.GetLogger().Errorw("Token info not found for address", "tokenAddress", tokenAddress.Hex()) - return + g.Go(func() error { + err := p.l2Listener.Start(ctx) + if err != nil { + return err } - tokenSymbol = tokenInfo.Symbol - tokenDecimals = tokenInfo.Decimals - } - - Amount := app.formatAmount(l2With.Amount, tokenDecimals) - - var title string - var text string - - if isETH { - title = fmt.Sprintf("[" + app.cfg.Network + "] [ETH Withdrawal Initialized]") - text = fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nL1Token: ETH\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2With.From, l2With.To, l2With.L2Token, Amount, tokenSymbol) - } else if isTON { - title = fmt.Sprintf("[" + app.cfg.Network + "] [TON Withdrawal Initialized]") - text = fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2With.From, l2With.To, app.tonAddress, l2With.L2Token, Amount, tokenSymbol) - } else { - title = fmt.Sprintf("[" + app.cfg.Network + "] [ERC-20 Withdrawal Initialized]") - text = fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2With.From, l2With.To, l2With.L1Token, l2With.L2Token, Amount, tokenSymbol) - } - - app.notifier.Notify(title, text) -} -func (app *App) L1UsdcDepEvent(vLog *types.Log) { - log.GetLogger().Infow("Got L1 USDC Deposit Event", "event", vLog) + return nil + }) - l1UsdcBridgeFilterer, _, err := app.getUsdcBridgeFilterers() - if err != nil { - return - } - - event, err := l1UsdcBridgeFilterer.ParseERC20DepositInitiated(*vLog) - if err != nil { - log.GetLogger().Errorw("USDC DepositInitiated event parsing fail", "error", err) - return - } - - l1UsdcDep := bindings.L1UsdcBridgeERC20DepositInitiated{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } - - Amount := app.formatAmount(l1UsdcDep.Amount, 6) - - // Slack notify title and text - title := fmt.Sprintf("[" + app.cfg.Network + "] [USDC Deposit Initialized]") - text := fmt.Sprintf("Tx: "+app.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l1UsdcDep.From, l1UsdcDep.To, l1UsdcDep.L1Token, l1UsdcDep.L2Token, Amount) - - app.notifier.Notify(title, text) -} - -func (app *App) L1UsdcWithEvent(vLog *types.Log) { - log.GetLogger().Infow("Got L1 USDC Withdrawal Event", "event", vLog) - - l1UsdcBridgeFilterer, _, err := app.getUsdcBridgeFilterers() - if err != nil { - return - } - - event, err := l1UsdcBridgeFilterer.ParseERC20WithdrawalFinalized(*vLog) - if err != nil { - log.GetLogger().Errorw("USDC WithdrawalFinalized event parsing fail", "error", err) - return - } - - l1UsdcWith := bindings.L1UsdcBridgeERC20WithdrawalFinalized{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, + if err := g.Wait(); err != nil { + log.GetLogger().Errorw("Failed to start service", "error", err) + return err } - Amount := app.formatAmount(l1UsdcWith.Amount, 6) - - // Slack notify title and text - title := fmt.Sprintf("[" + app.cfg.Network + "] [USDC Withdrawal Finalized]") - text := fmt.Sprintf("Tx: "+app.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l1UsdcWith.From, l1UsdcWith.To, l1UsdcWith.L1Token, l1UsdcWith.L2Token, Amount) - - app.notifier.Notify(title, text) + return nil } -func (app *App) L2UsdcDepEvent(vLog *types.Log) { - log.GetLogger().Infow("Got L2 USDC Deposit Event", "event", vLog) - - _, l2UsdcBridgeFilterer, err := app.getUsdcBridgeFilterers() - if err != nil { - return - } - - event, err := l2UsdcBridgeFilterer.ParseDepositFinalized(*vLog) +func (p *App) initL1Listener(ctx context.Context, slackNotifier *notification.SlackNotificationService, l1Client *bcclient.Client, redisClient redislib.UniversalClient) (*listener.EventService, error) { + l1SyncBlockMetadataRepo := repository.NewSyncBlockMetadataRepository(fmt.Sprintf("%s:%s", p.cfg.Network, "l1"), redisClient) + l1BlockKeeper, err := repository.NewBlockKeeper(ctx, l1Client, l1SyncBlockMetadataRepo) if err != nil { - log.GetLogger().Errorw("USDC DepositFinalized event parsing fail", "error", err) - return + log.GetLogger().Errorw("Failed to create L1 block keeper", "error", err) + return nil, err } - l2UsdcDep := bindings.L2UsdcBridgeDepositFinalized{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } - - Amount := app.formatAmount(l2UsdcDep.Amount, 6) - - title := fmt.Sprintf("[" + app.cfg.Network + "] [USDC Deposit Finalized]") - text := fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L1ExplorerUrl+"/address/%s\nTo: "+app.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l2UsdcDep.From, l2UsdcDep.To, l2UsdcDep.L1Token, l2UsdcDep.L2Token, Amount) - - app.notifier.Notify(title, text) -} - -func (app *App) L2UsdcWithEvent(vLog *types.Log) { - log.GetLogger().Infow("Got L2 USDC Withdrawal Event", "event", vLog) - - _, l2UsdcBridgeFilterer, err := app.getUsdcBridgeFilterers() + l1Service, err := listener.MakeService("l1-event-listener", l1Client, l1BlockKeeper) if err != nil { - return + log.GetLogger().Errorw("Failed to make L1 service", "error", err) + return nil, err } - event, err := l2UsdcBridgeFilterer.ParseWithdrawalInitiated(*vLog) - if err != nil { - log.GetLogger().Errorw("USDC WithdrawalInitiated event parsing fail", "error", err) - return - } - - l2UsdcWith := bindings.L2UsdcBridgeWithdrawalInitiated{ - L1Token: event.L1Token, - L2Token: event.L2Token, - From: event.From, - To: event.To, - Amount: event.Amount, - } - - Amount := app.formatAmount(l2UsdcWith.Amount, 6) - - title := fmt.Sprintf("[" + app.cfg.Network + "] [USDC Withdrawal Initialized]") - text := fmt.Sprintf("Tx: "+app.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+app.cfg.L2ExplorerUrl+"/address/%s\nTo: "+app.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+app.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+app.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l2UsdcWith.From, l2UsdcWith.To, l2UsdcWith.L1Token, l2UsdcWith.L2Token, Amount) - - app.notifier.Notify(title, text) -} - -func (app *App) Start() error { - l1Service := listener.MakeService(app.cfg.L1WsRpc) - l2Service := listener.MakeService(app.cfg.L2WsRpc) - // L1StandardBridge ETH deposit and withdrawal - l1Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L1StandardBridge, ETHDepositInitiatedEventABI, app.ETHDepEvent)) - l1Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L1StandardBridge, ETHWithdrawalFinalizedEventABI, app.ETHWithEvent)) + l1Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L1StandardBridge, ETHDepositInitiatedEventABI, p.depositETHInitiatedEvent)) + l1Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L1StandardBridge, ETHWithdrawalFinalizedEventABI, p.withdrawalETHFinalizedEvent)) // L1StandardBridge ERC20 deposit and withdrawal - l1Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L1StandardBridge, ERC20DepositInitiatedEventABI, app.ERC20DepEvent)) - l1Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L1StandardBridge, ERC20WithdrawalFinalizedEventABI, app.ERC20WithEvent)) - - // L2StandardBridge deposit and withdrawal - l2Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L2StandardBridge, DepositFinalizedEventABI, app.L2DepEvent)) - l2Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L2StandardBridge, WithdrawalInitiatedEventABI, app.L2WithEvent)) + l1Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L1StandardBridge, ERC20DepositInitiatedEventABI, p.depositERC20InitiatedEvent)) + l1Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L1StandardBridge, ERC20WithdrawalFinalizedEventABI, p.withdrawalERC20FinalizedEvent)) // L1UsdcBridge ERC20 deposit and withdrawal - l1Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L1UsdcBridge, ERC20DepositInitiatedEventABI, app.L1UsdcDepEvent)) - l1Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L1UsdcBridge, ERC20WithdrawalFinalizedEventABI, app.L1UsdcWithEvent)) - - // L2UsdcBridge ERC20 deposit and withdrawal - l2Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L2UsdcBridge, DepositFinalizedEventABI, app.L2UsdcDepEvent)) - l2Service.AddSubscribeRequest(listener.MakeEventRequest(app.cfg.L2UsdcBridge, WithdrawalInitiatedEventABI, app.L2UsdcWithEvent)) - - err := app.updateTokenInfo() - if err != nil { - log.GetLogger().Errorw("Failed to update token info", "err", err) - return err - } - - // Start both services - errCh := make(chan error, 2) - - go func() { - errCh <- l1Service.Start() - }() + l1Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L1UsdcBridge, ERC20DepositInitiatedEventABI, p.depositUsdcInitiatedEvent)) + l1Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L1UsdcBridge, ERC20WithdrawalFinalizedEventABI, p.withdrawalFinalizedEvent)) - go func() { - errCh <- l2Service.Start() - }() - - for i := 0; i < 2; i++ { - if err := <-errCh; err != nil { - log.GetLogger().Errorw("Failed to start service", "err", err) - return err - } - } - - return nil + return l1Service, nil } -func (app *App) updateTokenInfo() error { - data := &Data{cfg: app.cfg} - tokenInfoMap, err := data.tokenInfoMap() +func (p *App) initL2Listener(ctx context.Context, slackNotifier *notification.SlackNotificationService, l2Client *bcclient.Client, redisClient redislib.UniversalClient) (*listener.EventService, error) { + l2SyncBlockMetadataRepo := repository.NewSyncBlockMetadataRepository(fmt.Sprintf("%s:%s", p.cfg.Network, "l2"), redisClient) + l2BlockKeeper, err := repository.NewBlockKeeper(ctx, l2Client, l2SyncBlockMetadataRepo) if err != nil { - return err + log.GetLogger().Errorw("Failed to make L2 service", "error", err) + return nil, err } - for k, v := range tokenInfoMap { - if v.Symbol == "TON" { - app.tonAddress = k - break - } + l2Service, err := listener.MakeService("l2-event-listener", l2Client, l2BlockKeeper) + if err != nil { + log.GetLogger().Errorw("Failed to make L2 service", "error", err) + return nil, err } - app.tokenInfo = tokenInfoMap - - return nil -} - -func New(config *Config) *App { - slackNotifSrv := notification.MakeSlackNotificationService(config.SlackURL, 5) + // L2StandardBridge deposit and withdrawal + l2Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L2StandardBridge, DepositFinalizedEventABI, p.depositFinalizedEvent)) + l2Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L2StandardBridge, WithdrawalInitiatedEventABI, p.withdrawalInitiatedEvent)) - app := &App{ - cfg: config, - notifier: slackNotifSrv, - tokenInfo: make(map[string]TokenInfo), - } + // L2UsdcBridge ERC20 deposit and withdrawal + l2Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L2UsdcBridge, DepositFinalizedEventABI, p.DepositUsdcFinalizedEvent)) + l2Service.AddSubscribeRequest(listener.MakeEventRequest(slackNotifier, p.cfg.L2UsdcBridge, WithdrawalInitiatedEventABI, p.WithdrawalUsdcInitiatedEvent)) - return app + return l2Service, nil } diff --git a/internal/app/thanos-notif/config.go b/internal/app/thanos-notif/config.go index 19da732..62ff0fa 100644 --- a/internal/app/thanos-notif/config.go +++ b/internal/app/thanos-notif/config.go @@ -1,12 +1,16 @@ package thanosnotif +import ( + "errors" + + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/redis" +) + type Config struct { Network string - L1Rpc string L1WsRpc string - L2Rpc string L2WsRpc string L1StandardBridge string @@ -20,7 +24,40 @@ type Config struct { L1ExplorerUrl string L2ExplorerUrl string - OFF bool + L1TokenAddresses []string + L2TokenAddresses []string + + RedisConfig redis.Config +} + +func (c *Config) Validate() error { + if c.L1WsRpc == "" { + return errors.New("l1 ws rpc address is required") + } + + if c.L2WsRpc == "" { + return errors.New("l2 ws rpc address is required") + } + + if c.L1StandardBridge == "" { + return errors.New("l1 standard bridge is required") + } + + if c.L2StandardBridge == "" { + return errors.New("l2 standard bridge is required") + } + + if c.SlackURL == "" { + return errors.New("slack url is required") + } + + if len(c.L1TokenAddresses) == 0 { + return errors.New("token addresses is required") + } + + if c.RedisConfig.Addresses == "" { + return errors.New("redis address is required") + } - TokenAddresses []string + return nil } diff --git a/internal/app/thanos-notif/deposit.go b/internal/app/thanos-notif/deposit.go new file mode 100644 index 0000000..57ce78e --- /dev/null +++ b/internal/app/thanos-notif/deposit.go @@ -0,0 +1,202 @@ +package thanosnotif + +import ( + "fmt" + + ethereumTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" + "github.com/tokamak-network/tokamak-thanos/op-bindings/bindings" +) + +func (p *App) depositETHInitiatedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got ETH Deposit Event", "event", vLog) + + l1BridgeFilterer, _, err := p.getBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l1BridgeFilterer.ParseETHDepositInitiated(*vLog) + if err != nil { + log.GetLogger().Errorw("ETHDepositInitiated event parsing fail", "error", err) + return "", "", err + } + + ethDep := bindings.L1StandardBridgeETHDepositInitiated{ + From: event.From, + To: event.To, + Amount: event.Amount, + } + + Amount := formatAmount(ethDep.Amount, 18) + + // Slack notify title and text + title := fmt.Sprintf("[" + p.cfg.Network + "] [ETH Deposit Initialized]") + text := fmt.Sprintf("Tx: "+p.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nAmount: %s ETH", vLog.TxHash, ethDep.From, ethDep.To, Amount) + + return title, text, nil +} + +func (p *App) depositERC20InitiatedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got ERC20 Deposit Event", "event", vLog) + + l1BridgeFilterer, _, err := p.getBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l1BridgeFilterer.ParseERC20DepositInitiated(*vLog) + if err != nil { + log.GetLogger().Errorw("ERC20DepositInitiated event parsing fail", "error", err) + return "", "", err + } + + erc20Dep := bindings.L1StandardBridgeERC20DepositInitiated{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + // get symbol and decimals + l1Token := erc20Dep.L1Token + l1TokenInfo, found := p.l1TokensInfo[l1Token.Hex()] + if !found { + log.GetLogger().Errorw("Token info not found for address", "l1Token", l1Token.Hex()) + return "", "", err + } + + tokenSymbol := l1TokenInfo.Symbol + tokenDecimals := l1TokenInfo.Decimals + + amount := formatAmount(erc20Dep.Amount, tokenDecimals) + + // Slack notify title and text + var title string + + isTON := l1TokenInfo.Symbol == "TON" + + if isTON { + title = fmt.Sprintf("[" + p.cfg.Network + "] [TON Deposit Initialized]") + } else { + title = fmt.Sprintf("[" + p.cfg.Network + "] [ERC-20 Deposit Initialized]") + } + text := fmt.Sprintf("Tx: "+p.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, erc20Dep.From, erc20Dep.To, erc20Dep.L1Token, erc20Dep.L2Token, amount, tokenSymbol) + + return title, text, nil +} + +func (p *App) depositFinalizedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got L2 Deposit Event", "event", vLog) + + _, l2BridgeFilterer, err := p.getBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l2BridgeFilterer.ParseDepositFinalized(*vLog) + if err != nil { + log.GetLogger().Errorw("DepositFinalized event parsing fail", "error", err) + return "", "", err + } + + l2Dep := bindings.L2StandardBridgeDepositFinalized{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + // get symbol and decimals + l2Token := l2Dep.L2Token + + l2TokenInfo, found := p.l2TokensInfo[l2Token.Hex()] + if !found { + log.GetLogger().Errorw("Token info not found for address", "l2Token", l2Token.Hex()) + return "", "", err + } + + amount := formatAmount(l2Dep.Amount, l2TokenInfo.Decimals) + + var title string + var text string + + isETH := l2TokenInfo.Symbol == "ETH" + isTON := l2TokenInfo.Symbol == "TON" + + if isETH { + title = fmt.Sprintf("[" + p.cfg.Network + "] [ETH Deposit Finalized]") + text = fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nL1Token: ETH\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2Dep.From, l2Dep.To, l2Dep.L2Token, amount, l2TokenInfo.Symbol) + } else if isTON { + title = fmt.Sprintf("[" + p.cfg.Network + "] [TON Deposit Finalized]") + text = fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2Dep.From, l2Dep.To, l2Dep.L1Token, l2Dep.L2Token, amount, l2TokenInfo.Symbol) + } else { + title = fmt.Sprintf("[" + p.cfg.Network + "] [ERC-20 Deposit Finalized]") + text = fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2Dep.From, l2Dep.To, l2Dep.L1Token, l2Dep.L2Token, amount, l2TokenInfo.Symbol) + } + + return title, text, nil +} + +func (p *App) depositUsdcInitiatedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got L1 USDC Deposit Event", "event", vLog) + + l1UsdcBridgeFilterer, _, err := p.getUSDCBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l1UsdcBridgeFilterer.ParseERC20DepositInitiated(*vLog) + if err != nil { + log.GetLogger().Errorw("USDC DepositInitiated event parsing fail", "error", err) + return "", "", err + } + + l1UsdcDep := bindings.L1UsdcBridgeERC20DepositInitiated{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + amount := formatAmount(l1UsdcDep.Amount, 6) + + // Slack notify title and text + title := fmt.Sprintf("[" + p.cfg.Network + "] [USDC Deposit Initialized]") + text := fmt.Sprintf("Tx: "+p.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\namount: %s USDC", vLog.TxHash, l1UsdcDep.From, l1UsdcDep.To, l1UsdcDep.L1Token, l1UsdcDep.L2Token, amount) + + return title, text, nil +} + +func (p *App) DepositUsdcFinalizedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got L2 USDC Deposit Event", "event", vLog) + + _, l2UsdcBridgeFilterer, err := p.getUSDCBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l2UsdcBridgeFilterer.ParseDepositFinalized(*vLog) + if err != nil { + log.GetLogger().Errorw("USDC DepositFinalized event parsing fail", "error", err) + return "", "", err + } + + l2UsdcDep := bindings.L2UsdcBridgeDepositFinalized{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + amount := formatAmount(l2UsdcDep.Amount, 6) + + title := fmt.Sprintf("[" + p.cfg.Network + "] [USDC Deposit Finalized]") + text := fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L1ExplorerUrl+"/address/%s\nTo: "+p.cfg.L2ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l2UsdcDep.From, l2UsdcDep.To, l2UsdcDep.L1Token, l2UsdcDep.L2Token, amount) + + return title, text, nil +} diff --git a/internal/app/thanos-notif/filterer.go b/internal/app/thanos-notif/filterer.go new file mode 100644 index 0000000..11dfff3 --- /dev/null +++ b/internal/app/thanos-notif/filterer.go @@ -0,0 +1,39 @@ +package thanosnotif + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" + "github.com/tokamak-network/tokamak-thanos/op-bindings/bindings" +) + +func (p *App) getBridgeFilterers() (l1BridgeFilterer *bindings.L1StandardBridgeFilterer, l2BridgeFilterer *bindings.L2StandardBridgeFilterer, err error) { + l1BridgeFilterer, err = bindings.NewL1StandardBridgeFilterer(common.HexToAddress(p.cfg.L1StandardBridge), p.l1Client.GetClient()) + if err != nil { + log.GetLogger().Errorw("L1StandardBridgeFilterer instance fail", "error", err) + return nil, nil, err + } + + l2BridgeFilterer, err = bindings.NewL2StandardBridgeFilterer(common.HexToAddress(p.cfg.L2StandardBridge), p.l2Client.GetClient()) + if err != nil { + log.GetLogger().Errorw("L2StandardBridgeFilterer instance fail", "error", err) + return nil, nil, err + } + + return l1BridgeFilterer, l2BridgeFilterer, nil +} + +func (p *App) getUSDCBridgeFilterers() (l1UsdcBridgeFilterer *bindings.L1UsdcBridgeFilterer, l2UsdcBridgeFilterer *bindings.L2UsdcBridgeFilterer, err error) { + l1UsdcBridgeFilterer, err = bindings.NewL1UsdcBridgeFilterer(common.HexToAddress(p.cfg.L1UsdcBridge), p.l1Client.GetClient()) + if err != nil { + log.GetLogger().Errorw("Failed to init the L1UsdcBridgeFilterer", "error", err) + return nil, nil, err + } + + l2UsdcBridgeFilterer, err = bindings.NewL2UsdcBridgeFilterer(common.HexToAddress(p.cfg.L2UsdcBridge), p.l2Client.GetClient()) + if err != nil { + log.GetLogger().Errorw("Failed to init the L2UsdcBridgeFilterer", "error", err) + return nil, nil, err + } + + return l1UsdcBridgeFilterer, l2UsdcBridgeFilterer, nil +} diff --git a/internal/app/thanos-notif/helpers.go b/internal/app/thanos-notif/helpers.go index 8b144eb..df333cf 100644 --- a/internal/app/thanos-notif/helpers.go +++ b/internal/app/thanos-notif/helpers.go @@ -1,16 +1,18 @@ package thanosnotif import ( + "errors" "math/big" "strings" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/erc20" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" - "github.com/tokamak-network/tokamak-thanos/op-bindings/bindings" ) -func (app *App) formatAmount(amount *big.Int, tokenDecimals int) string { +func formatAmount(amount *big.Int, tokenDecimals int) string { amountFloat := new(big.Float).SetInt(amount) amountFloat.Quo(amountFloat, new(big.Float).SetInt(big.NewInt(0).Exp(big.NewInt(10), big.NewInt(int64(tokenDecimals)), nil))) formattedAmount := strings.TrimRight(strings.TrimRight(amountFloat.Text('f', tokenDecimals+1), "0"), ".") @@ -18,46 +20,24 @@ func (app *App) formatAmount(amount *big.Int, tokenDecimals int) string { return formattedAmount } -func (app *App) getBridgeFilterers() (l1BridgeFilterer *bindings.L1StandardBridgeFilterer, l2BridgeFilterer *bindings.L2StandardBridgeFilterer, err error) { - client, err := ethclient.Dial(app.cfg.L1Rpc) - if err != nil { - log.GetLogger().Errorw("Failed to connect to client", "error", err) - return nil, nil, err - } - - l1BridgeFilterer, err = bindings.NewL1StandardBridgeFilterer(common.HexToAddress(app.cfg.L1StandardBridge), client) - if err != nil { - log.GetLogger().Errorw("L1StandardBridgeFilterer instance fail", "error", err) - return nil, nil, err - } - - l2BridgeFilterer, err = bindings.NewL2StandardBridgeFilterer(common.HexToAddress(app.cfg.L2StandardBridge), client) - if err != nil { - log.GetLogger().Errorw("L2StandardBridgeFilterer instance fail", "error", err) - return nil, nil, err - } +func fetchTokensInfo(bcClient *bcclient.Client, tokenAddresses []string) (map[string]*types.Token, error) { + tokenInfoMap := make(map[string]*types.Token) + for _, tokenAddress := range tokenAddresses { + tokenInfo, err := erc20.FetchTokenInfo(bcClient, tokenAddress) + if err != nil { + log.GetLogger().Errorw("Failed to fetch token info", "error", err, "address", tokenAddress) + return nil, err + } - return l1BridgeFilterer, l2BridgeFilterer, nil -} + if tokenInfo == nil { + log.GetLogger().Errorw("Token info empty", "address", tokenAddress) + return nil, errors.New("token info is empty") + } -func (app *App) getUsdcBridgeFilterers() (l1UsdcBridgeFilterer *bindings.L1UsdcBridgeFilterer, l2UsdcBridgeFilterer *bindings.L2UsdcBridgeFilterer, err error) { - client, err := ethclient.Dial(app.cfg.L1Rpc) - if err != nil { - log.GetLogger().Errorw("Failed to connect to client", "error", err) - return nil, nil, err - } - - l1UsdcBridgeFilterer, err = bindings.NewL1UsdcBridgeFilterer(common.HexToAddress(app.cfg.L1UsdcBridge), client) - if err != nil { - log.GetLogger().Errorw("L1UsdcBridgeFilterer instance fail", "error", err) - return nil, nil, err - } + log.GetLogger().Infow("Got token info", "token", tokenInfo) - l2UsdcBridgeFilterer, err = bindings.NewL2UsdcBridgeFilterer(common.HexToAddress(app.cfg.L2UsdcBridge), client) - if err != nil { - log.GetLogger().Errorw("L2UsdcBridgeFilterer instance fail", "error", err) - return nil, nil, err + tokenInfoMap[tokenAddress] = tokenInfo } - return l1UsdcBridgeFilterer, l2UsdcBridgeFilterer, nil + return tokenInfoMap, nil } diff --git a/internal/app/thanos-notif/tokenInfo.go b/internal/app/thanos-notif/tokenInfo.go deleted file mode 100644 index f2648ea..0000000 --- a/internal/app/thanos-notif/tokenInfo.go +++ /dev/null @@ -1,159 +0,0 @@ -package thanosnotif - -import ( - "encoding/hex" - "encoding/json" - "fmt" - "io" - "net/http" - "strconv" - "strings" -) - -type Data struct { - cfg *Config -} - -type EthCallResponse struct { - Jsonrpc string `json:"jsonrpc"` - ID int `json:"id"` - Result string `json:"result"` -} - -type TokenInfo struct { - Address string - Symbol string - Decimals int -} - -func (data *Data) tokenInfoMap() (map[string]TokenInfo, error) { - tokenInfoMap := make(map[string]TokenInfo) - checkAddresses := make(map[string]struct{}) - - for _, tokenAddress := range data.cfg.TokenAddresses { - tokenAddressLower := strings.ToLower(tokenAddress) - - if _, exists := checkAddresses[tokenAddressLower]; exists { - continue - } - - checkAddresses[tokenAddressLower] = struct{}{} - - symbol, decimals, err := data.getTokenInfo(tokenAddressLower) - if err != nil { - return nil, err - } - - tokenInfo := TokenInfo{ - Address: tokenAddress, - Symbol: symbol, - Decimals: decimals, - } - - tokenInfoMap[tokenAddress] = tokenInfo - tokenInfoMap[tokenAddressLower] = tokenInfo - - fmt.Printf("Token Address: %s, Symbol: %s, Decimals: %d\n", tokenAddress, symbol, decimals) - } - - return tokenInfoMap, nil -} - -func (data *Data) getTokenInfo(tokenAddress string) (string, int, error) { - client := &http.Client{} - - // get symbol data - symbolData := fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_call","params":[{"to":"%s","data":"0x95d89b41"}, "latest"],"id":1}`, tokenAddress) - - symbolResp, err := data.makeRequest(client, symbolData) - if err != nil { - return "", 0, err - } - - symbol, err := decodeHexString(symbolResp.Result) - if err != nil { - return "", 0, err - } - - // get decimals data - decimalsData := fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_call","params":[{"to":"%s","data":"0x313ce567"}, "latest"],"id":2}`, tokenAddress) - - decimalsResp, err := data.makeRequest(client, decimalsData) - if err != nil { - return "", 0, err - } - - decimals, err := decodeDecimals(decimalsResp.Result) - if err != nil { - return "", 0, err - } - - return symbol, decimals, nil -} - -func (data *Data) makeRequest(client *http.Client, tokenData string) (EthCallResponse, error) { - req, err := http.NewRequest("POST", data.cfg.L1Rpc, strings.NewReader(tokenData)) - if err != nil { - return EthCallResponse{}, err - } - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - return EthCallResponse{}, err - } - defer resp.Body.Close() - - bodyText, err := io.ReadAll(resp.Body) - if err != nil { - return EthCallResponse{}, err - } - - var ethCallResp EthCallResponse - err = json.Unmarshal(bodyText, ðCallResp) - if err != nil { - return EthCallResponse{}, err - } - - return ethCallResp, nil -} - -func decodeHexString(hexStr string) (string, error) { - if len(hexStr) < 2 { - return "", fmt.Errorf("hex string is too short") - } - resultHex := hexStr[2:] - resultBytes, err := hex.DecodeString(resultHex) - if err != nil { - return "", err - } - - decodedString := string(resultBytes) - - decodedString = strings.Map(func(r rune) rune { - if r < 32 || r == 127 { // ASCII - return -1 - } - return r - }, decodedString) - - decodedString = strings.TrimSpace(decodedString) - - return decodedString, nil -} - -func decodeDecimals(hexStr string) (int, error) { - if len(hexStr) < 2 { - return 0, fmt.Errorf("hex string is too short") - } - resultHex := hexStr[2:] - - if len(resultHex) > 0 { - decimalsInt, err := strconv.ParseInt(resultHex, 16, 64) - if err != nil { - return 0, err - } - return int(decimalsInt), nil - } - return 0, fmt.Errorf("invalid decimals value") -} diff --git a/internal/app/thanos-notif/withdrawal.go b/internal/app/thanos-notif/withdrawal.go new file mode 100644 index 0000000..8fc4ddc --- /dev/null +++ b/internal/app/thanos-notif/withdrawal.go @@ -0,0 +1,206 @@ +package thanosnotif + +import ( + "fmt" + + ethereumTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" + "github.com/tokamak-network/tokamak-thanos/op-bindings/bindings" +) + +func (p *App) withdrawalETHFinalizedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got ETH Withdrawal Event", "event", vLog) + + l1BridgeFilterer, _, err := p.getBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l1BridgeFilterer.ParseETHWithdrawalFinalized(*vLog) + if err != nil { + log.GetLogger().Errorw("ETHWithdrawalFinalized event log parsing fail", "error", err) + return "", "", err + } + + ethWith := bindings.L1StandardBridgeETHWithdrawalFinalized{ + From: event.From, + To: event.To, + Amount: event.Amount, + } + + Amount := formatAmount(ethWith.Amount, 18) + + // Slack notify title and text + title := fmt.Sprintf("[" + p.cfg.Network + "] [ETH Withdrawal Finalized]") + text := fmt.Sprintf("Tx: "+p.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nAmount: %s ETH", vLog.TxHash, ethWith.From, ethWith.To, Amount) + + return title, text, nil +} + +func (p *App) withdrawalERC20FinalizedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got ERC20 Withdrawal Event", "event", vLog) + + l1BridgeFilterer, _, err := p.getBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l1BridgeFilterer.ParseERC20WithdrawalFinalized(*vLog) + if err != nil { + log.GetLogger().Errorw("ERC20WithdrawalFinalized event parsing fail", "error", err) + return "", "", err + } + + erc20With := bindings.L1StandardBridgeERC20WithdrawalFinalized{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + // get symbol and decimals + l1Token := erc20With.L1Token + l1TokenInfo, found := p.l1TokensInfo[l1Token.Hex()] + if !found { + log.GetLogger().Errorw("Token info not found for address", "l1Token", l1Token.Hex()) + return "", "", err + } + + tokenSymbol := l1TokenInfo.Symbol + tokenDecimals := l1TokenInfo.Decimals + + amount := formatAmount(erc20With.Amount, tokenDecimals) + + // Slack notify title and text + var title string + + isTON := l1TokenInfo.Symbol == "TON" + + if isTON { + title = fmt.Sprintf("[" + p.cfg.Network + "] [TON Withdrawal Finalized]") + } else { + title = fmt.Sprintf("[" + p.cfg.Network + "] [ERC-20 Withdrawal Finalized]") + } + text := fmt.Sprintf("Tx: "+p.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, erc20With.From, erc20With.To, erc20With.L1Token, erc20With.L2Token, amount, tokenSymbol) + + return title, text, nil +} + +func (p *App) withdrawalInitiatedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got L2 Withdrawal Event", "event", vLog) + + _, l2BridgeFilterer, err := p.getBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l2BridgeFilterer.ParseWithdrawalInitiated(*vLog) + if err != nil { + log.GetLogger().Errorw("WithdrawalInitiated event parsing fail", "error", err) + return "", "", err + } + + l2With := bindings.L2StandardBridgeWithdrawalInitiated{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + // get symbol and decimals + var tokenSymbol string + var tokenDecimals int + + l2Token := l2With.L2Token + + l2TokenInfo, found := p.l2TokensInfo[l2Token.Hex()] + if !found { + log.GetLogger().Errorw("Token info not found for address", "l2Token", l2Token.Hex()) + return "", "", err + } + + amount := formatAmount(l2With.Amount, tokenDecimals) + + var title string + var text string + + isETH := l2TokenInfo.Symbol == "ETH" + isTON := l2TokenInfo.Symbol == "TON" + + if isETH { + title = fmt.Sprintf("[" + p.cfg.Network + "] [ETH Withdrawal Initialized]") + text = fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nL1Token: ETH\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2With.From, l2With.To, l2With.L2Token, amount, tokenSymbol) + } else if isTON { + title = fmt.Sprintf("[" + p.cfg.Network + "] [TON Withdrawal Initialized]") + text = fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2With.From, l2With.To, l2With.L1Token, l2With.L2Token, amount, tokenSymbol) + } else { + title = fmt.Sprintf("[" + p.cfg.Network + "] [ERC-20 Withdrawal Initialized]") + text = fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s %s", vLog.TxHash, l2With.From, l2With.To, l2With.L1Token, l2With.L2Token, amount, tokenSymbol) + } + + return title, text, nil +} + +func (p *App) withdrawalFinalizedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got L1 USDC Withdrawal Event", "event", vLog) + + l1UsdcBridgeFilterer, _, err := p.getUSDCBridgeFilterers() + if err != nil { + return "", "", err + } + + event, err := l1UsdcBridgeFilterer.ParseERC20WithdrawalFinalized(*vLog) + if err != nil { + log.GetLogger().Errorw("USDC WithdrawalFinalized event parsing fail", "error", err) + return "", "", err + } + + l1UsdcWith := bindings.L1UsdcBridgeERC20WithdrawalFinalized{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + Amount := formatAmount(l1UsdcWith.Amount, 6) + + // Slack notify title and text + title := fmt.Sprintf("[" + p.cfg.Network + "] [USDC Withdrawal Finalized]") + text := fmt.Sprintf("Tx: "+p.cfg.L1ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l1UsdcWith.From, l1UsdcWith.To, l1UsdcWith.L1Token, l1UsdcWith.L2Token, Amount) + + return title, text, nil +} + +func (p *App) WithdrawalUsdcInitiatedEvent(vLog *ethereumTypes.Log) (string, string, error) { + log.GetLogger().Infow("Got L2 USDC Withdrawal Event", "event", vLog) + + _, l2UsdcBridgeFilterer, err := p.getUSDCBridgeFilterers() + if err != nil { + log.GetLogger().Errorw("Failed to get USDC bridge filters", "error", err) + return "", "", err + } + + event, err := l2UsdcBridgeFilterer.ParseWithdrawalInitiated(*vLog) + if err != nil { + log.GetLogger().Errorw("Failed to parse the USDC WithdrawalInitiated event", "error", err) + return "", "", err + } + + l2UsdcWith := bindings.L2UsdcBridgeWithdrawalInitiated{ + L1Token: event.L1Token, + L2Token: event.L2Token, + From: event.From, + To: event.To, + Amount: event.Amount, + } + + Amount := formatAmount(l2UsdcWith.Amount, 6) + + title := fmt.Sprintf("[" + p.cfg.Network + "] [USDC Withdrawal Initialized]") + text := fmt.Sprintf("Tx: "+p.cfg.L2ExplorerUrl+"/tx/%s\nFrom: "+p.cfg.L2ExplorerUrl+"/address/%s\nTo: "+p.cfg.L1ExplorerUrl+"/address/%s\nL1Token: "+p.cfg.L1ExplorerUrl+"/token/%s\nL2Token: "+p.cfg.L2ExplorerUrl+"/token/%s\nAmount: %s USDC", vLog.TxHash, l2UsdcWith.From, l2UsdcWith.To, l2UsdcWith.L1Token, l2UsdcWith.L2Token, Amount) + + return title, text, nil +} diff --git a/internal/pkg/bcclient/client.go b/internal/pkg/bcclient/client.go new file mode 100644 index 0000000..aa61b3f --- /dev/null +++ b/internal/pkg/bcclient/client.go @@ -0,0 +1,147 @@ +package bcclient + +import ( + "context" + "math/big" + "net/http" + "time" + + ethereumTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + "golang.org/x/sync/errgroup" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" +) + +type Client struct { + defaultClient *ethclient.Client + chainID *big.Int +} + +func New(ctx context.Context, rpcURL string) (*Client, error) { + httpClient := &http.Client{ + Timeout: 3 * time.Second, + } + rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient)) + if err != nil { + return nil, err + } + + ethClient := ethclient.NewClient(rpcClient) + + chainID, err := ethClient.ChainID(ctx) + if err != nil { + return nil, err + } + + return &Client{ + defaultClient: ethClient, + chainID: chainID, + }, nil +} + +func (c *Client) GetClient() *ethclient.Client { + return c.defaultClient +} + +func (c *Client) SubscribeNewHead(ctx context.Context, newHeadCh chan<- *ethereumTypes.Header) (ethereum.Subscription, error) { + return c.defaultClient.SubscribeNewHead(ctx, newHeadCh) +} + +func (c *Client) BlockNumber(ctx context.Context) (uint64, error) { + return c.defaultClient.BlockNumber(ctx) +} + +func (c *Client) GetHeader(ctx context.Context) (*ethereumTypes.Header, error) { + return c.defaultClient.HeaderByNumber(ctx, nil) +} + +func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethereumTypes.Header, error) { + headerAtBlockNo, err := c.defaultClient.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNo)) + if err != nil { + return nil, err + } + + return headerAtBlockNo, nil +} + +func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) { + timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + query := ethereum.FilterQuery{ + BlockHash: &blockHash, + } + + // Get the logs + logs, err := c.defaultClient.FilterLogs(timeOutCtx, query) + if err != nil { + log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err) + return nil, err + } + + return logs, nil +} + +func (c *Client) HeaderAtBlockHash(ctx context.Context, blockHash common.Hash) (*ethereumTypes.Header, error) { + headerAtBlockHash, err := c.defaultClient.HeaderByHash(ctx, blockHash) + if err != nil { + return nil, err + } + + return headerAtBlockHash, nil +} + +func (c *Client) GetBlocks(ctx context.Context, withLogs bool, fromBlock, toBlock uint64) ([]*types.NewBlock, error) { + log.GetLogger().Infow("Fetch blocks info", "from_block", fromBlock, "to_block", toBlock) + totalBlocks := toBlock - fromBlock + 1 + + blocks := make([]*types.NewBlock, totalBlocks) + + g, _ := errgroup.WithContext(ctx) + for index := uint64(0); index < totalBlocks; index++ { + index := index + blockNo := index + fromBlock + + g.Go(func() error { + header, err := c.HeaderAtBlockNumber(ctx, blockNo) + if err != nil { + log.GetLogger().Errorw("Failed to get block header", "err", err) + return err + } + + blocks[index] = &types.NewBlock{ + Header: header, + } + + if withLogs { + logs, err := c.GetLogs(ctx, header.Hash()) + if err != nil { + log.GetLogger().Errorw("Failed to get block logs", "err", err) + return err + } + blocks[index].Logs = logs + } + + return nil + }) + } + + err := g.Wait() + if err != nil { + log.GetLogger().Errorw("Failed to get the block header", "err", err) + + return nil, err + } + + if len(blocks) == 0 { + return nil, nil + } + + return blocks, nil +} diff --git a/internal/pkg/constant/constant.go b/internal/pkg/constant/constant.go new file mode 100644 index 0000000..5a9bb70 --- /dev/null +++ b/internal/pkg/constant/constant.go @@ -0,0 +1,7 @@ +package constant + +import ( + "github.com/ethereum/go-ethereum/common" +) + +var ZeroHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") diff --git a/internal/pkg/erc20/abi.json b/internal/pkg/erc20/abi.json new file mode 100644 index 0000000..668d697 --- /dev/null +++ b/internal/pkg/erc20/abi.json @@ -0,0 +1,222 @@ +[ + { + "constant": true, + "inputs": [], + "name": "name", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_spender", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "approve", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "totalSupply", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_from", + "type": "address" + }, + { + "name": "_to", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "transferFrom", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "decimals", + "outputs": [ + { + "name": "", + "type": "uint8" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_owner", + "type": "address" + } + ], + "name": "balanceOf", + "outputs": [ + { + "name": "balance", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "symbol", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_to", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "transfer", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_owner", + "type": "address" + }, + { + "name": "_spender", + "type": "address" + } + ], + "name": "allowance", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "payable": true, + "stateMutability": "payable", + "type": "fallback" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "owner", + "type": "address" + }, + { + "indexed": true, + "name": "spender", + "type": "address" + }, + { + "indexed": false, + "name": "value", + "type": "uint256" + } + ], + "name": "Approval", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "from", + "type": "address" + }, + { + "indexed": true, + "name": "to", + "type": "address" + }, + { + "indexed": false, + "name": "value", + "type": "uint256" + } + ], + "name": "Transfer", + "type": "event" + } +] \ No newline at end of file diff --git a/internal/pkg/erc20/client.go b/internal/pkg/erc20/client.go new file mode 100644 index 0000000..9a92048 --- /dev/null +++ b/internal/pkg/erc20/client.go @@ -0,0 +1,35 @@ +package erc20 + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" +) + +func FetchTokenInfo(bcClient *bcclient.Client, tokenAddress string) (*types.Token, error) { + erc20Instance, err := NewErc20(common.HexToAddress(tokenAddress), bcClient.GetClient()) + if err != nil { + log.GetLogger().Errorw("Failed to create erc20 instance", "error", err) + return nil, err + } + + symbol, err := erc20Instance.Symbol(nil) + if err != nil { + log.GetLogger().Errorw("Failed to get symbol", "error", err) + return nil, err + } + + decimals, err := erc20Instance.Decimals(nil) + if err != nil { + log.GetLogger().Errorw("Failed to get decimals", "error", err) + return nil, err + } + + return &types.Token{ + Decimals: int(decimals), + Symbol: symbol, + Address: tokenAddress, + }, nil +} diff --git a/internal/pkg/erc20/erc20.go b/internal/pkg/erc20/erc20.go new file mode 100644 index 0000000..1a807a0 --- /dev/null +++ b/internal/pkg/erc20/erc20.go @@ -0,0 +1,759 @@ +// Code generated - DO NOT EDIT. +// This file is a generated binding and any manual changes will be lost. + +package erc20 + +import ( + "errors" + "math/big" + "strings" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" +) + +// Reference imports to suppress errors if they are not otherwise used. +var ( + _ = errors.New + _ = big.NewInt + _ = strings.NewReader + _ = ethereum.NotFound + _ = bind.Bind + _ = common.Big1 + _ = types.BloomLookup + _ = event.NewSubscription + _ = abi.ConvertType +) + +// Erc20MetaData contains all meta data concerning the Erc20 contract. +var Erc20MetaData = &bind.MetaData{ + ABI: "[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":false,\"inputs\":[{\"name\":\"_spender\",\"type\":\"address\"},{\"name\":\"_value\",\"type\":\"uint256\"}],\"name\":\"approve\",\"outputs\":[{\"name\":\"\",\"type\":\"bool\"}],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":false,\"inputs\":[{\"name\":\"_from\",\"type\":\"address\"},{\"name\":\"_to\",\"type\":\"address\"},{\"name\":\"_value\",\"type\":\"uint256\"}],\"name\":\"transferFrom\",\"outputs\":[{\"name\":\"\",\"type\":\"bool\"}],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"decimals\",\"outputs\":[{\"name\":\"\",\"type\":\"uint8\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[{\"name\":\"_owner\",\"type\":\"address\"}],\"name\":\"balanceOf\",\"outputs\":[{\"name\":\"balance\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"symbol\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":false,\"inputs\":[{\"name\":\"_to\",\"type\":\"address\"},{\"name\":\"_value\",\"type\":\"uint256\"}],\"name\":\"transfer\",\"outputs\":[{\"name\":\"\",\"type\":\"bool\"}],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[{\"name\":\"_owner\",\"type\":\"address\"},{\"name\":\"_spender\",\"type\":\"address\"}],\"name\":\"allowance\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"payable\":true,\"stateMutability\":\"payable\",\"type\":\"fallback\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"owner\",\"type\":\"address\"},{\"indexed\":true,\"name\":\"spender\",\"type\":\"address\"},{\"indexed\":false,\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Approval\",\"type\":\"event\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"from\",\"type\":\"address\"},{\"indexed\":true,\"name\":\"to\",\"type\":\"address\"},{\"indexed\":false,\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Transfer\",\"type\":\"event\"}]", +} + +// Erc20ABI is the input ABI used to generate the binding from. +// Deprecated: Use Erc20MetaData.ABI instead. +var Erc20ABI = Erc20MetaData.ABI + +// Erc20 is an auto generated Go binding around an Ethereum contract. +type Erc20 struct { + Erc20Caller // Read-only binding to the contract + Erc20Transactor // Write-only binding to the contract + Erc20Filterer // Log filterer for contract events +} + +// Erc20Caller is an auto generated read-only Go binding around an Ethereum contract. +type Erc20Caller struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// Erc20Transactor is an auto generated write-only Go binding around an Ethereum contract. +type Erc20Transactor struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// Erc20Filterer is an auto generated log filtering Go binding around an Ethereum contract events. +type Erc20Filterer struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// Erc20Session is an auto generated Go binding around an Ethereum contract, +// with pre-set call and transact options. +type Erc20Session struct { + Contract *Erc20 // Generic contract binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// Erc20CallerSession is an auto generated read-only Go binding around an Ethereum contract, +// with pre-set call options. +type Erc20CallerSession struct { + Contract *Erc20Caller // Generic contract caller binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session +} + +// Erc20TransactorSession is an auto generated write-only Go binding around an Ethereum contract, +// with pre-set transact options. +type Erc20TransactorSession struct { + Contract *Erc20Transactor // Generic contract transactor binding to set the session for + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// Erc20Raw is an auto generated low-level Go binding around an Ethereum contract. +type Erc20Raw struct { + Contract *Erc20 // Generic contract binding to access the raw methods on +} + +// Erc20CallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract. +type Erc20CallerRaw struct { + Contract *Erc20Caller // Generic read-only contract binding to access the raw methods on +} + +// Erc20TransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract. +type Erc20TransactorRaw struct { + Contract *Erc20Transactor // Generic write-only contract binding to access the raw methods on +} + +// NewErc20 creates a new instance of Erc20, bound to a specific deployed contract. +func NewErc20(address common.Address, backend bind.ContractBackend) (*Erc20, error) { + contract, err := bindErc20(address, backend, backend, backend) + if err != nil { + return nil, err + } + return &Erc20{Erc20Caller: Erc20Caller{contract: contract}, Erc20Transactor: Erc20Transactor{contract: contract}, Erc20Filterer: Erc20Filterer{contract: contract}}, nil +} + +// NewErc20Caller creates a new read-only instance of Erc20, bound to a specific deployed contract. +func NewErc20Caller(address common.Address, caller bind.ContractCaller) (*Erc20Caller, error) { + contract, err := bindErc20(address, caller, nil, nil) + if err != nil { + return nil, err + } + return &Erc20Caller{contract: contract}, nil +} + +// NewErc20Transactor creates a new write-only instance of Erc20, bound to a specific deployed contract. +func NewErc20Transactor(address common.Address, transactor bind.ContractTransactor) (*Erc20Transactor, error) { + contract, err := bindErc20(address, nil, transactor, nil) + if err != nil { + return nil, err + } + return &Erc20Transactor{contract: contract}, nil +} + +// NewErc20Filterer creates a new log filterer instance of Erc20, bound to a specific deployed contract. +func NewErc20Filterer(address common.Address, filterer bind.ContractFilterer) (*Erc20Filterer, error) { + contract, err := bindErc20(address, nil, nil, filterer) + if err != nil { + return nil, err + } + return &Erc20Filterer{contract: contract}, nil +} + +// bindErc20 binds a generic wrapper to an already deployed contract. +func bindErc20(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) { + parsed, err := Erc20MetaData.GetAbi() + if err != nil { + return nil, err + } + return bind.NewBoundContract(address, *parsed, caller, transactor, filterer), nil +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_Erc20 *Erc20Raw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error { + return _Erc20.Contract.Erc20Caller.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_Erc20 *Erc20Raw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _Erc20.Contract.Erc20Transactor.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_Erc20 *Erc20Raw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _Erc20.Contract.Erc20Transactor.contract.Transact(opts, method, params...) +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_Erc20 *Erc20CallerRaw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error { + return _Erc20.Contract.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_Erc20 *Erc20TransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _Erc20.Contract.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_Erc20 *Erc20TransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _Erc20.Contract.contract.Transact(opts, method, params...) +} + +// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e. +// +// Solidity: function allowance(address _owner, address _spender) view returns(uint256) +func (_Erc20 *Erc20Caller) Allowance(opts *bind.CallOpts, _owner common.Address, _spender common.Address) (*big.Int, error) { + var out []interface{} + err := _Erc20.contract.Call(opts, &out, "allowance", _owner, _spender) + + if err != nil { + return *new(*big.Int), err + } + + out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) + + return out0, err + +} + +// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e. +// +// Solidity: function allowance(address _owner, address _spender) view returns(uint256) +func (_Erc20 *Erc20Session) Allowance(_owner common.Address, _spender common.Address) (*big.Int, error) { + return _Erc20.Contract.Allowance(&_Erc20.CallOpts, _owner, _spender) +} + +// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e. +// +// Solidity: function allowance(address _owner, address _spender) view returns(uint256) +func (_Erc20 *Erc20CallerSession) Allowance(_owner common.Address, _spender common.Address) (*big.Int, error) { + return _Erc20.Contract.Allowance(&_Erc20.CallOpts, _owner, _spender) +} + +// BalanceOf is a free data retrieval call binding the contract method 0x70a08231. +// +// Solidity: function balanceOf(address _owner) view returns(uint256 balance) +func (_Erc20 *Erc20Caller) BalanceOf(opts *bind.CallOpts, _owner common.Address) (*big.Int, error) { + var out []interface{} + err := _Erc20.contract.Call(opts, &out, "balanceOf", _owner) + + if err != nil { + return *new(*big.Int), err + } + + out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) + + return out0, err + +} + +// BalanceOf is a free data retrieval call binding the contract method 0x70a08231. +// +// Solidity: function balanceOf(address _owner) view returns(uint256 balance) +func (_Erc20 *Erc20Session) BalanceOf(_owner common.Address) (*big.Int, error) { + return _Erc20.Contract.BalanceOf(&_Erc20.CallOpts, _owner) +} + +// BalanceOf is a free data retrieval call binding the contract method 0x70a08231. +// +// Solidity: function balanceOf(address _owner) view returns(uint256 balance) +func (_Erc20 *Erc20CallerSession) BalanceOf(_owner common.Address) (*big.Int, error) { + return _Erc20.Contract.BalanceOf(&_Erc20.CallOpts, _owner) +} + +// Decimals is a free data retrieval call binding the contract method 0x313ce567. +// +// Solidity: function decimals() view returns(uint8) +func (_Erc20 *Erc20Caller) Decimals(opts *bind.CallOpts) (uint8, error) { + var out []interface{} + err := _Erc20.contract.Call(opts, &out, "decimals") + + if err != nil { + return *new(uint8), err + } + + out0 := *abi.ConvertType(out[0], new(uint8)).(*uint8) + + return out0, err + +} + +// Decimals is a free data retrieval call binding the contract method 0x313ce567. +// +// Solidity: function decimals() view returns(uint8) +func (_Erc20 *Erc20Session) Decimals() (uint8, error) { + return _Erc20.Contract.Decimals(&_Erc20.CallOpts) +} + +// Decimals is a free data retrieval call binding the contract method 0x313ce567. +// +// Solidity: function decimals() view returns(uint8) +func (_Erc20 *Erc20CallerSession) Decimals() (uint8, error) { + return _Erc20.Contract.Decimals(&_Erc20.CallOpts) +} + +// Name is a free data retrieval call binding the contract method 0x06fdde03. +// +// Solidity: function name() view returns(string) +func (_Erc20 *Erc20Caller) Name(opts *bind.CallOpts) (string, error) { + var out []interface{} + err := _Erc20.contract.Call(opts, &out, "name") + + if err != nil { + return *new(string), err + } + + out0 := *abi.ConvertType(out[0], new(string)).(*string) + + return out0, err + +} + +// Name is a free data retrieval call binding the contract method 0x06fdde03. +// +// Solidity: function name() view returns(string) +func (_Erc20 *Erc20Session) Name() (string, error) { + return _Erc20.Contract.Name(&_Erc20.CallOpts) +} + +// Name is a free data retrieval call binding the contract method 0x06fdde03. +// +// Solidity: function name() view returns(string) +func (_Erc20 *Erc20CallerSession) Name() (string, error) { + return _Erc20.Contract.Name(&_Erc20.CallOpts) +} + +// Symbol is a free data retrieval call binding the contract method 0x95d89b41. +// +// Solidity: function symbol() view returns(string) +func (_Erc20 *Erc20Caller) Symbol(opts *bind.CallOpts) (string, error) { + var out []interface{} + err := _Erc20.contract.Call(opts, &out, "symbol") + + if err != nil { + return *new(string), err + } + + out0 := *abi.ConvertType(out[0], new(string)).(*string) + + return out0, err + +} + +// Symbol is a free data retrieval call binding the contract method 0x95d89b41. +// +// Solidity: function symbol() view returns(string) +func (_Erc20 *Erc20Session) Symbol() (string, error) { + return _Erc20.Contract.Symbol(&_Erc20.CallOpts) +} + +// Symbol is a free data retrieval call binding the contract method 0x95d89b41. +// +// Solidity: function symbol() view returns(string) +func (_Erc20 *Erc20CallerSession) Symbol() (string, error) { + return _Erc20.Contract.Symbol(&_Erc20.CallOpts) +} + +// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd. +// +// Solidity: function totalSupply() view returns(uint256) +func (_Erc20 *Erc20Caller) TotalSupply(opts *bind.CallOpts) (*big.Int, error) { + var out []interface{} + err := _Erc20.contract.Call(opts, &out, "totalSupply") + + if err != nil { + return *new(*big.Int), err + } + + out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) + + return out0, err + +} + +// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd. +// +// Solidity: function totalSupply() view returns(uint256) +func (_Erc20 *Erc20Session) TotalSupply() (*big.Int, error) { + return _Erc20.Contract.TotalSupply(&_Erc20.CallOpts) +} + +// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd. +// +// Solidity: function totalSupply() view returns(uint256) +func (_Erc20 *Erc20CallerSession) TotalSupply() (*big.Int, error) { + return _Erc20.Contract.TotalSupply(&_Erc20.CallOpts) +} + +// Approve is a paid mutator transaction binding the contract method 0x095ea7b3. +// +// Solidity: function approve(address _spender, uint256 _value) returns(bool) +func (_Erc20 *Erc20Transactor) Approve(opts *bind.TransactOpts, _spender common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.contract.Transact(opts, "approve", _spender, _value) +} + +// Approve is a paid mutator transaction binding the contract method 0x095ea7b3. +// +// Solidity: function approve(address _spender, uint256 _value) returns(bool) +func (_Erc20 *Erc20Session) Approve(_spender common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.Contract.Approve(&_Erc20.TransactOpts, _spender, _value) +} + +// Approve is a paid mutator transaction binding the contract method 0x095ea7b3. +// +// Solidity: function approve(address _spender, uint256 _value) returns(bool) +func (_Erc20 *Erc20TransactorSession) Approve(_spender common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.Contract.Approve(&_Erc20.TransactOpts, _spender, _value) +} + +// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. +// +// Solidity: function transfer(address _to, uint256 _value) returns(bool) +func (_Erc20 *Erc20Transactor) Transfer(opts *bind.TransactOpts, _to common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.contract.Transact(opts, "transfer", _to, _value) +} + +// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. +// +// Solidity: function transfer(address _to, uint256 _value) returns(bool) +func (_Erc20 *Erc20Session) Transfer(_to common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.Contract.Transfer(&_Erc20.TransactOpts, _to, _value) +} + +// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. +// +// Solidity: function transfer(address _to, uint256 _value) returns(bool) +func (_Erc20 *Erc20TransactorSession) Transfer(_to common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.Contract.Transfer(&_Erc20.TransactOpts, _to, _value) +} + +// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd. +// +// Solidity: function transferFrom(address _from, address _to, uint256 _value) returns(bool) +func (_Erc20 *Erc20Transactor) TransferFrom(opts *bind.TransactOpts, _from common.Address, _to common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.contract.Transact(opts, "transferFrom", _from, _to, _value) +} + +// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd. +// +// Solidity: function transferFrom(address _from, address _to, uint256 _value) returns(bool) +func (_Erc20 *Erc20Session) TransferFrom(_from common.Address, _to common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.Contract.TransferFrom(&_Erc20.TransactOpts, _from, _to, _value) +} + +// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd. +// +// Solidity: function transferFrom(address _from, address _to, uint256 _value) returns(bool) +func (_Erc20 *Erc20TransactorSession) TransferFrom(_from common.Address, _to common.Address, _value *big.Int) (*types.Transaction, error) { + return _Erc20.Contract.TransferFrom(&_Erc20.TransactOpts, _from, _to, _value) +} + +// Fallback is a paid mutator transaction binding the contract fallback function. +// +// Solidity: fallback() payable returns() +func (_Erc20 *Erc20Transactor) Fallback(opts *bind.TransactOpts, calldata []byte) (*types.Transaction, error) { + return _Erc20.contract.RawTransact(opts, calldata) +} + +// Fallback is a paid mutator transaction binding the contract fallback function. +// +// Solidity: fallback() payable returns() +func (_Erc20 *Erc20Session) Fallback(calldata []byte) (*types.Transaction, error) { + return _Erc20.Contract.Fallback(&_Erc20.TransactOpts, calldata) +} + +// Fallback is a paid mutator transaction binding the contract fallback function. +// +// Solidity: fallback() payable returns() +func (_Erc20 *Erc20TransactorSession) Fallback(calldata []byte) (*types.Transaction, error) { + return _Erc20.Contract.Fallback(&_Erc20.TransactOpts, calldata) +} + +// Erc20ApprovalIterator is returned from FilterApproval and is used to iterate over the raw logs and unpacked data for Approval events raised by the Erc20 contract. +type Erc20ApprovalIterator struct { + Event *Erc20Approval // Event containing the contract specifics and raw log + + contract *bind.BoundContract // Generic contract to use for unpacking event data + event string // Event name to use for unpacking event data + + logs chan types.Log // Log channel receiving the found contract events + sub ethereum.Subscription // Subscription for errors, completion and termination + done bool // Whether the subscription completed delivering logs + fail error // Occurred error to stop iteration +} + +// Next advances the iterator to the subsequent event, returning whether there +// are any more events found. In case of a retrieval or parsing error, false is +// returned and Error() can be queried for the exact failure. +func (it *Erc20ApprovalIterator) Next() bool { + // If the iterator failed, stop iterating + if it.fail != nil { + return false + } + // If the iterator completed, deliver directly whatever's available + if it.done { + select { + case log := <-it.logs: + it.Event = new(Erc20Approval) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + default: + return false + } + } + // Iterator still in progress, wait for either a data or an error event + select { + case log := <-it.logs: + it.Event = new(Erc20Approval) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + case err := <-it.sub.Err(): + it.done = true + it.fail = err + return it.Next() + } +} + +// Error returns any retrieval or parsing error occurred during filtering. +func (it *Erc20ApprovalIterator) Error() error { + return it.fail +} + +// Close terminates the iteration process, releasing any pending underlying +// resources. +func (it *Erc20ApprovalIterator) Close() error { + it.sub.Unsubscribe() + return nil +} + +// Erc20Approval represents a Approval event raised by the Erc20 contract. +type Erc20Approval struct { + Owner common.Address + Spender common.Address + Value *big.Int + Raw types.Log // Blockchain specific contextual infos +} + +// FilterApproval is a free log retrieval operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925. +// +// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value) +func (_Erc20 *Erc20Filterer) FilterApproval(opts *bind.FilterOpts, owner []common.Address, spender []common.Address) (*Erc20ApprovalIterator, error) { + + var ownerRule []interface{} + for _, ownerItem := range owner { + ownerRule = append(ownerRule, ownerItem) + } + var spenderRule []interface{} + for _, spenderItem := range spender { + spenderRule = append(spenderRule, spenderItem) + } + + logs, sub, err := _Erc20.contract.FilterLogs(opts, "Approval", ownerRule, spenderRule) + if err != nil { + return nil, err + } + return &Erc20ApprovalIterator{contract: _Erc20.contract, event: "Approval", logs: logs, sub: sub}, nil +} + +// WatchApproval is a free log subscription operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925. +// +// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value) +func (_Erc20 *Erc20Filterer) WatchApproval(opts *bind.WatchOpts, sink chan<- *Erc20Approval, owner []common.Address, spender []common.Address) (event.Subscription, error) { + + var ownerRule []interface{} + for _, ownerItem := range owner { + ownerRule = append(ownerRule, ownerItem) + } + var spenderRule []interface{} + for _, spenderItem := range spender { + spenderRule = append(spenderRule, spenderItem) + } + + logs, sub, err := _Erc20.contract.WatchLogs(opts, "Approval", ownerRule, spenderRule) + if err != nil { + return nil, err + } + return event.NewSubscription(func(quit <-chan struct{}) error { + defer sub.Unsubscribe() + for { + select { + case log := <-logs: + // New log arrived, parse the event and forward to the user + event := new(Erc20Approval) + if err := _Erc20.contract.UnpackLog(event, "Approval", log); err != nil { + return err + } + event.Raw = log + + select { + case sink <- event: + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + } + }), nil +} + +// ParseApproval is a log parse operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925. +// +// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value) +func (_Erc20 *Erc20Filterer) ParseApproval(log types.Log) (*Erc20Approval, error) { + event := new(Erc20Approval) + if err := _Erc20.contract.UnpackLog(event, "Approval", log); err != nil { + return nil, err + } + event.Raw = log + return event, nil +} + +// Erc20TransferIterator is returned from FilterTransfer and is used to iterate over the raw logs and unpacked data for Transfer events raised by the Erc20 contract. +type Erc20TransferIterator struct { + Event *Erc20Transfer // Event containing the contract specifics and raw log + + contract *bind.BoundContract // Generic contract to use for unpacking event data + event string // Event name to use for unpacking event data + + logs chan types.Log // Log channel receiving the found contract events + sub ethereum.Subscription // Subscription for errors, completion and termination + done bool // Whether the subscription completed delivering logs + fail error // Occurred error to stop iteration +} + +// Next advances the iterator to the subsequent event, returning whether there +// are any more events found. In case of a retrieval or parsing error, false is +// returned and Error() can be queried for the exact failure. +func (it *Erc20TransferIterator) Next() bool { + // If the iterator failed, stop iterating + if it.fail != nil { + return false + } + // If the iterator completed, deliver directly whatever's available + if it.done { + select { + case log := <-it.logs: + it.Event = new(Erc20Transfer) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + default: + return false + } + } + // Iterator still in progress, wait for either a data or an error event + select { + case log := <-it.logs: + it.Event = new(Erc20Transfer) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + case err := <-it.sub.Err(): + it.done = true + it.fail = err + return it.Next() + } +} + +// Error returns any retrieval or parsing error occurred during filtering. +func (it *Erc20TransferIterator) Error() error { + return it.fail +} + +// Close terminates the iteration process, releasing any pending underlying +// resources. +func (it *Erc20TransferIterator) Close() error { + it.sub.Unsubscribe() + return nil +} + +// Erc20Transfer represents a Transfer event raised by the Erc20 contract. +type Erc20Transfer struct { + From common.Address + To common.Address + Value *big.Int + Raw types.Log // Blockchain specific contextual infos +} + +// FilterTransfer is a free log retrieval operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. +// +// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) +func (_Erc20 *Erc20Filterer) FilterTransfer(opts *bind.FilterOpts, from []common.Address, to []common.Address) (*Erc20TransferIterator, error) { + + var fromRule []interface{} + for _, fromItem := range from { + fromRule = append(fromRule, fromItem) + } + var toRule []interface{} + for _, toItem := range to { + toRule = append(toRule, toItem) + } + + logs, sub, err := _Erc20.contract.FilterLogs(opts, "Transfer", fromRule, toRule) + if err != nil { + return nil, err + } + return &Erc20TransferIterator{contract: _Erc20.contract, event: "Transfer", logs: logs, sub: sub}, nil +} + +// WatchTransfer is a free log subscription operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. +// +// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) +func (_Erc20 *Erc20Filterer) WatchTransfer(opts *bind.WatchOpts, sink chan<- *Erc20Transfer, from []common.Address, to []common.Address) (event.Subscription, error) { + + var fromRule []interface{} + for _, fromItem := range from { + fromRule = append(fromRule, fromItem) + } + var toRule []interface{} + for _, toItem := range to { + toRule = append(toRule, toItem) + } + + logs, sub, err := _Erc20.contract.WatchLogs(opts, "Transfer", fromRule, toRule) + if err != nil { + return nil, err + } + return event.NewSubscription(func(quit <-chan struct{}) error { + defer sub.Unsubscribe() + for { + select { + case log := <-logs: + // New log arrived, parse the event and forward to the user + event := new(Erc20Transfer) + if err := _Erc20.contract.UnpackLog(event, "Transfer", log); err != nil { + return err + } + event.Raw = log + + select { + case sink <- event: + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + } + }), nil +} + +// ParseTransfer is a log parse operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. +// +// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) +func (_Erc20 *Erc20Filterer) ParseTransfer(log types.Log) (*Erc20Transfer, error) { + event := new(Erc20Transfer) + if err := _Erc20.contract.UnpackLog(event, "Transfer", log); err != nil { + return nil, err + } + event.Raw = log + return event, nil +} diff --git a/internal/pkg/listener/event.go b/internal/pkg/listener/event.go deleted file mode 100644 index c4fe920..0000000 --- a/internal/pkg/listener/event.go +++ /dev/null @@ -1,176 +0,0 @@ -package listener - -import ( - "bytes" - "context" - "encoding/gob" - "fmt" - "math/big" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - - "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" -) - -var ( - RequestEventType = 1 -) - -type RequestSubscriber interface { - GetRequestType() int - SerializeEventRequest() string - Callback(item interface{}) -} - -type EventService struct { - host string - client *ethclient.Client - requests []RequestSubscriber - requestMap map[string]RequestSubscriber - startBlock *big.Int - filter *CounterBloom -} - -func MakeService(host string) *EventService { - service := &EventService{host: host, filter: MakeDefaultCounterBloom()} - service.initialize() - return service -} - -func MakeServiceWithStartBlock(host string, start *big.Int) *EventService { - service := &EventService{host: host, startBlock: start, filter: MakeDefaultCounterBloom()} - service.initialize() - return service -} - -func (service *EventService) initialize() { - service.requests = make([]RequestSubscriber, 0) - service.requestMap = make(map[string]RequestSubscriber) -} - -func (service *EventService) existRequest(request RequestSubscriber) bool { - key := request.SerializeEventRequest() - _, ok := service.requestMap[key] - return ok -} - -func (service *EventService) RequestByKey(key string) RequestSubscriber { - request, ok := service.requestMap[key] - if ok { - return request - } else { - return nil - } -} - -func (service *EventService) AddSubscribeRequest(request RequestSubscriber) { - if service.existRequest(request) { - return - } - service.requests = append(service.requests, request) - key := request.SerializeEventRequest() - service.requestMap[key] = request -} - -func (service *EventService) CanProcess(log *types.Log) bool { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - - err := enc.Encode(log) - if err != nil { - return false - } - data := buf.Bytes() - - if service.filter.Test(data) { - return false - } - service.filter.Add(data) - return true -} - -func (service *EventService) Start() error { - log.GetLogger().Infow("Start to listen", "host", service.host) - client, err := ethclient.Dial(service.host) - if err != nil { - return err - } - - log.GetLogger().Infow("Connected to", "host", service.host) - - service.client = client - - fromBlock := service.startBlock - - addresses := CalculateAddresses(service.requests) - - log.GetLogger().Infow("Listen to these addresses", "addresses", addresses, "from_block", fromBlock) - - query := ethereum.FilterQuery{ - FromBlock: fromBlock, - Addresses: addresses, - } - - logsCh := make(chan types.Log) - defer close(logsCh) - sub, err := service.client.SubscribeFilterLogs(context.Background(), query, logsCh) - if err != nil { - log.GetLogger().Errorw("Failed to subscribe filter logs", "err", err) - return err - } - defer sub.Unsubscribe() - - for { - select { - case err := <-sub.Err(): - log.GetLogger().Errorw("Failed to listen the subscription", "err", err) - return err - case vLog := <-logsCh: - key := serializeEventRequestWithAddressAndABI(vLog.Address, vLog.Topics[0]) - request := service.RequestByKey(key) - if request != nil { - if service.CanProcess(&vLog) { - request.Callback(&vLog) - } - } - } - } -} - -func (service *EventService) Call(msg ethereum.CallMsg) ([]byte, error) { - var data []byte - var err error - for i := 0; i < 5; i++ { - data, err = service.client.CallContract(context.Background(), msg, nil) - if err != nil { - continue - } - log.GetLogger().Infow("Result when calling the contract", "data", string(data)) - break - } - return data, err -} - -func (service *EventService) GetBlockByHash(blockHash common.Hash) (*types.Block, error) { - var block *types.Block - var err error - for i := 0; i < 5; i++ { - block, err = service.client.BlockByHash(context.Background(), blockHash) - if err == nil { - break - } - } - if err != nil { - log.GetLogger().Errorw("Failed to retrieve block", "err", err) - return nil, err - } - return block, nil -} - -func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI common.Hash) string { - result := fmt.Sprintf("%s:%s", address.String(), hashedABI) - return result -} diff --git a/internal/pkg/listener/request.go b/internal/pkg/listener/request.go index 9d03e21..3d1f06d 100644 --- a/internal/pkg/listener/request.go +++ b/internal/pkg/listener/request.go @@ -4,31 +4,54 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" ) +type Notifier interface { + NotifyWithReTry(title string, text string) + Notify(title string, text string) error + Enable() + Disable() +} type EventRequest struct { contractAddress common.Address eventABI string - handler func(vLog *types.Log) + handler func(vLog *types.Log) (string, string, error) + notifier Notifier } -func (request *EventRequest) SerializeEventRequest() string { - hashedABI := crypto.Keccak256Hash([]byte(request.eventABI)) - return serializeEventRequestWithAddressAndABI(request.contractAddress, hashedABI) +func (r *EventRequest) SerializeEventRequest() string { + hashedABI := crypto.Keccak256Hash([]byte(r.eventABI)) + return serializeEventRequestWithAddressAndABI(r.contractAddress, hashedABI) } -func (request *EventRequest) GetRequestType() int { +func (r *EventRequest) GetRequestType() int { return RequestEventType } -func (request *EventRequest) Callback(v interface{}) { +func (r *EventRequest) Callback(v any) { if v, ok := v.(*types.Log); ok { - request.handler(v) + title, text, err := r.handler(v) + if err != nil { + log.GetLogger().Errorw("Failed to handle event request", "err", err, "log", v) + return + } + + err = r.notifier.Notify(title, text) + if err != nil { + log.GetLogger().Errorw("Failed to notify event request", "err", err, "log", v) + return + } } } -func MakeEventRequest(addr string, eventABI string, handler func(vLog *types.Log)) *EventRequest { +func MakeEventRequest(notifier Notifier, addr string, eventABI string, handler func(vLog *types.Log) (string, string, error)) *EventRequest { address := common.HexToAddress(addr) - return &EventRequest{contractAddress: address, eventABI: eventABI, handler: handler} + return &EventRequest{ + contractAddress: address, + eventABI: eventABI, + handler: handler, + notifier: notifier, + } } diff --git a/internal/pkg/listener/service.go b/internal/pkg/listener/service.go new file mode 100644 index 0000000..2146221 --- /dev/null +++ b/internal/pkg/listener/service.go @@ -0,0 +1,396 @@ +package listener + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + "math" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + ethereumTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var ( + RequestEventType = 1 +) + +const ( + MaxBatchBlocksSize = 10 +) + +type RequestSubscriber interface { + GetRequestType() int + SerializeEventRequest() string + Callback(item any) +} + +type BlockKeeper interface { + Head(ctx context.Context) (*ethereumTypes.Header, error) + SetHead(ctx context.Context, newHeader *ethereumTypes.Header, replaceHash common.Hash) error + Contains(header *ethereumTypes.Header) bool + GetReorgHeaders(ctx context.Context, header *ethereumTypes.Header) ([]*ethereumTypes.Header, []common.Hash, error) +} + +type BlockChainSource interface { + SubscribeNewHead(ctx context.Context, newHeadCh chan<- *ethereumTypes.Header) (ethereum.Subscription, error) + BlockNumber(ctx context.Context) (uint64, error) + GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) + GetBlocks(ctx context.Context, withLogs bool, fromBlock, toBlock uint64) ([]*types.NewBlock, error) +} + +type EventService struct { + l *zap.SugaredLogger + bcClient BlockChainSource + blockKeeper BlockKeeper + requestMap map[string]RequestSubscriber + filter *CounterBloom + sub ethereum.Subscription +} + +func MakeService(name string, bcClient BlockChainSource, keeper BlockKeeper) (*EventService, error) { + service := &EventService{ + l: log.GetLogger().Named(name), + bcClient: bcClient, + blockKeeper: keeper, + filter: MakeDefaultCounterBloom(), + requestMap: make(map[string]RequestSubscriber), + } + + return service, nil +} + +func (s *EventService) existRequest(request RequestSubscriber) bool { + key := request.SerializeEventRequest() + _, ok := s.requestMap[key] + return ok +} + +func (s *EventService) RequestByKey(key string) RequestSubscriber { + request, ok := s.requestMap[key] + if ok { + return request + } + return nil +} + +func (s *EventService) AddSubscribeRequest(request RequestSubscriber) { + if s.existRequest(request) { + return + } + key := request.SerializeEventRequest() + s.requestMap[key] = request +} + +func (s *EventService) CanProcess(log *ethereumTypes.Log) bool { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + + err := enc.Encode(log) + if err != nil { + return false + } + data := buf.Bytes() + + if s.filter.Test(data) { + return false + } + + s.filter.Add(data) + return true +} + +func (s *EventService) Start(ctx context.Context) error { + oldBlocksCh := make(chan *types.NewBlock) + + errCh := make(chan error, 1) + s.l.Infow("Start to sync old blocks") + + g, _ := errgroup.WithContext(ctx) + + g.Go(func() error { + err := s.syncOldBlocks(ctx, oldBlocksCh) + defer close(oldBlocksCh) + + if err != nil { + s.l.Errorw("Failed to sync old blocks", "err", err) + return err + } + + return nil + }) + + for oldBlock := range oldBlocksCh { + err := s.handleNewBlock(ctx, oldBlock) + if err != nil { + s.l.Errorw("Failed to handle the old block", "err", err) + return err + } + } + + if err := g.Wait(); err != nil { + s.l.Errorw("Failed to sync old blocks", "err", err) + return err + } + + s.sub = event.ResubscribeErr(10, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + s.l.Errorw("Failed to re-subscribe the event", "err", err) + } + + return s.subscribeNewHead(ctx) + }) + + go func() { + err, ok := <-s.sub.Err() + if !ok { + return + } + s.l.Errorw("Failed to subscribe new head", "err", err) + + errCh <- err + }() + + for { + select { + case <-ctx.Done(): + return nil + case err := <-errCh: + s.l.Errorw("Failed to re-subscribe the event", "err", err) + return err + } + } +} + +func (s *EventService) subscribeNewHead( + _ context.Context, +) (ethereum.Subscription, error) { + headChanges := make(chan *ethereumTypes.Header, 64) + + ctx := context.Background() + + sub, err := s.bcClient.SubscribeNewHead(ctx, headChanges) + if err != nil { + return nil, err + } + s.l.Infow("Start process new head") + + newSub := event.NewSubscription(func(quit <-chan struct{}) error { + eventsCtx, cancelFunc := context.WithCancel(ctx) + defer sub.Unsubscribe() + defer cancelFunc() + + go func() { + select { + case <-quit: + cancelFunc() + case <-eventsCtx.Done(): // don't wait for quit signal if we closed for other reasons. + return + } + }() + + for { + select { + case newHead := <-headChanges: + s.l.Infow("New head received", "header", newHead.Number) + + logs, err := s.bcClient.GetLogs(ctx, newHead.Hash()) + if err != nil { + s.l.Errorw("Failed to filter logs", "err", err) + return err + } + + err = s.handleNewBlock(ctx, &types.NewBlock{ + Logs: logs, + Header: newHead, + }) + + if err != nil { + s.l.Errorw("Failed to handle the new head", "err", err) + return err + } + + case <-eventsCtx.Done(): + return nil + case subErr := <-sub.Err(): + return subErr + } + } + }) + + return newSub, nil +} + +func (s *EventService) handleNewBlock(ctx context.Context, newBlock *types.NewBlock) error { + if newBlock == nil { + return nil + } + + newHeader := newBlock.Header + reorgedBlocks, err := s.handleReorgBlocks(ctx, newHeader) + if err != nil { + s.l.Errorw("Failed to handle re-org blocks", "err", err) + return err + } + + blocks := make([]*types.NewBlock, 0) + if len(reorgedBlocks) > 0 { + blocks = append(blocks, reorgedBlocks...) + } + + blocks = append(blocks, newBlock) + + for _, block := range blocks { + err = s.filterEventsAndNotify(ctx, block.Logs) + if err != nil { + s.l.Errorw("Failed to handle block", "err", err, "block", block) + return err + } + + err = s.blockKeeper.SetHead(ctx, block.Header, block.ReorgedBlockHash) + if err != nil { + s.l.Errorw("Failed to set head on the keeper", "err", err, "block", block) + return err + } + } + + return nil +} + +func (s *EventService) filterEventsAndNotify(_ context.Context, logs []ethereumTypes.Log) error { + for _, l := range logs { + if len(l.Topics) == 0 { + continue + } + + key := serializeEventRequestWithAddressAndABI(l.Address, l.Topics[0]) + request := s.RequestByKey(key) + + if request == nil { + continue + } + + if l.Removed { + continue + } + + if !s.CanProcess(&l) { + continue + } + + request.Callback(&l) + } + return nil +} + +func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.NewBlock) error { + onchainBlockNo, err := s.bcClient.BlockNumber(ctx) + if err != nil { + return err + } + + consumingBlock, err := s.blockKeeper.Head(ctx) + if err != nil { + s.l.Errorw("Failed to get block head from keeper", "err", err) + return err + } + + if consumingBlock == nil { + return nil + } + + consumedBlockNo := consumingBlock.Number.Uint64() + + if consumedBlockNo >= onchainBlockNo { + return nil + } + + s.l.Infow("Fetch old blocks", "consumed_block", consumedBlockNo, "onchain_block", onchainBlockNo) + + blocksNeedToConsume := onchainBlockNo - consumedBlockNo + + totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize))) + + s.l.Infow("Total batches", "total", totalBatches) + skip := consumedBlockNo + 1 + for i := 0; i < totalBatches; i++ { + fromBlock := skip + toBlock := skip + MaxBatchBlocksSize - 1 + + if toBlock > onchainBlockNo { + toBlock = onchainBlockNo + } + + blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock) + if err != nil { + return err + } + + for _, oldHead := range blocks { + headCh <- oldHead + } + skip = toBlock + 1 + } + + return nil +} + +func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereumTypes.Header) ([]*types.NewBlock, error) { + newBlocks, reorgedBlockHashes, err := s.blockKeeper.GetReorgHeaders(ctx, newHeader) + if err != nil { + s.l.Errorw("Failed to handle reorg blocks", "err", err) + return nil, err + } + + if len(newBlocks) == 0 { + return nil, nil + } + + if len(reorgedBlockHashes) != len(newBlocks) { + return nil, fmt.Errorf("reorged block numbers don't match") + } + + var g errgroup.Group + + reorgedBlocks := make([]*types.NewBlock, len(newBlocks)) + for i, newBlock := range newBlocks { + s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64()) + i := i + newBlock := newBlock + + g.Go(func() error { + blockHash := newBlock.Hash() + reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash) + if errLogs != nil { + s.l.Errorw("Failed to get logs", "err", errLogs) + return errLogs + } + + reorgedBlocks[i] = &types.NewBlock{ + Header: newBlock, + Logs: reorgedLogs, + ReorgedBlockHash: reorgedBlockHashes[i], + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + return nil, err + } + + return reorgedBlocks, nil +} + +func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI common.Hash) string { + result := fmt.Sprintf("%s:%s", address.String(), hashedABI) + return result +} diff --git a/internal/pkg/listener/utils.go b/internal/pkg/listener/utils.go index 0d9fc64..20f7ce0 100644 --- a/internal/pkg/listener/utils.go +++ b/internal/pkg/listener/utils.go @@ -6,7 +6,7 @@ import ( func CalculateAddresses(requests []RequestSubscriber) []common.Address { encountered := map[common.Address]bool{} - result := []common.Address{} + result := make([]common.Address, 0) for _, v := range requests { if v.GetRequestType() == RequestEventType { diff --git a/internal/pkg/notification/slack.go b/internal/pkg/notification/slack.go index e7f488d..614c5fb 100644 --- a/internal/pkg/notification/slack.go +++ b/internal/pkg/notification/slack.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "os" "time" "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" @@ -26,10 +25,6 @@ func MakeSlackNotificationService(url string, numOfRetry int) *SlackNotification return &SlackNotificationService{url: url, numOfRetry: numOfRetry, off: false} } -func MakeDefaultSlackNotificationService() *SlackNotificationService { - return &SlackNotificationService{url: os.Getenv("SLACK_URL"), numOfRetry: 5, off: os.Getenv("OFF") == "1"} -} - func (slackNotificationService *SlackNotificationService) Enable() { slackNotificationService.off = false } diff --git a/internal/pkg/queue/circular_queue.go b/internal/pkg/queue/circular_queue.go new file mode 100644 index 0000000..5c79bda --- /dev/null +++ b/internal/pkg/queue/circular_queue.go @@ -0,0 +1,112 @@ +package queue + +import ( + "errors" +) + +// CircularQueue represents a circular queue with a fixed size. +type CircularQueue[T comparable] struct { + size int + data []T + uniqueData map[T]bool + front int + rear int + count int +} + +// NewCircularQueue creates a new CircularQueue with the specified size. +func NewCircularQueue[T comparable](size int) *CircularQueue[T] { + return &CircularQueue[T]{ + size: size, + data: make([]T, size), + uniqueData: make(map[T]bool), + rear: -1, // rear starts at -1 to handle the first increment correctly + } +} + +// Enqueue adds an element to the end of the queue. +// If the queue is full, it overwrites the oldest element. +func (q *CircularQueue[T]) Enqueue(value T) { + if q.IsFull() { + // When the queue is full, we overwrite the oldest item. + q.front = (q.front + 1) % q.size + } else { + q.count++ + } + q.rear = (q.rear + 1) % q.size + q.data[q.rear] = value + q.uniqueData[value] = true +} + +// RemoveAndEnqueue removes and adds an element to the end of the queue. +// If the queue is full, it overwrites the oldest element. +func (q *CircularQueue[T]) RemoveAndEnqueue(value T, removed T) { + q.Remove(removed) + q.Enqueue(value) +} + +func (q *CircularQueue[T]) Remove(value T) { + if !q.Contains(value) { + return + } + + // Find the index of the element to remove + var foundIndex int + for i := 0; i < q.count; i++ { + index := (q.front + i) % q.size + if q.data[index] == value { + foundIndex = index + break + } + } + + // Shift elements to fill the gap + for i := foundIndex; i != q.rear; i = (i + 1) % q.size { + nextIndex := (i + 1) % q.size + q.data[i] = q.data[nextIndex] + } + + // Clear the slot at the rear + var zeroValue T + q.data[q.rear] = zeroValue + + // Update rear + q.rear = (q.rear - 1 + q.size) % q.size + q.count-- + delete(q.uniqueData, value) +} + +// Dequeue removes and returns the element at the front of the queue. +func (q *CircularQueue[T]) Dequeue() (T, error) { + var zeroValue T + if q.IsEmpty() { + return zeroValue, errors.New("queue is empty") + } + value := q.data[q.front] + q.data[q.front] = zeroValue // Clear the slot + delete(q.uniqueData, value) + q.front = (q.front + 1) % q.size + q.count-- + + return value, nil +} + +// Contains checks if a specified element exists in the queue. +func (q *CircularQueue[T]) Contains(value T) bool { + return q.uniqueData[value] +} + +// Size returns the number of elements in the queue. +func (q *CircularQueue[T]) Size() int { + return q.count +} + +// IsEmpty checks if the queue is empty. +func (q *CircularQueue[T]) IsEmpty() bool { + return q.count == 0 +} + +// IsFull checks if the queue is full. +func (q *CircularQueue[T]) IsFull() bool { + return q.count == q.size +} diff --git a/internal/pkg/queue/circular_queue_test.go b/internal/pkg/queue/circular_queue_test.go new file mode 100644 index 0000000..9fac01e --- /dev/null +++ b/internal/pkg/queue/circular_queue_test.go @@ -0,0 +1,53 @@ +package queue_test + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/queue" +) + +func Test_CircularQueue(t *testing.T) { + q := queue.NewCircularQueue[string](64) + + // Example of adding maps to the queue. + for i := 0; i < 70; i++ { + q.Enqueue(strconv.Itoa(i)) + } + + // Example of removing maps from the queue. + count := 0 + for !q.IsEmpty() { + item, err := q.Dequeue() + if err != nil { + t.Errorf("Failed to dequeue, err: %s", err.Error()) + } else { + t.Logf("Item: %s", item) + } + count++ + } + assert.Equal(t, 64, count) +} + +func Test_CircularQueue_Remove(t *testing.T) { + q := queue.NewCircularQueue[string](64) + + for i := 0; i < 70; i++ { + q.Enqueue(strconv.Itoa(i)) + } + + q.Remove("68") + assert.Equal(t, false, q.Contains("68")) + assert.Equal(t, 63, q.Size()) + + q.Remove("69") + assert.Equal(t, false, q.Contains("69")) + assert.Equal(t, 62, q.Size()) + + q.RemoveAndEnqueue("70", "67") + assert.Equal(t, true, q.Contains("70")) + assert.Equal(t, false, q.Contains("67")) + assert.Equal(t, 62, q.Size()) +} diff --git a/internal/pkg/redis/client.go b/internal/pkg/redis/client.go new file mode 100644 index 0000000..31882dd --- /dev/null +++ b/internal/pkg/redis/client.go @@ -0,0 +1,28 @@ +package redis + +import ( + "context" + "errors" + "strings" + + "github.com/go-redis/redis/v8" +) + +func New(ctx context.Context, redisConfig Config) (redis.UniversalClient, error) { + redisAddresses := strings.Split(redisConfig.Addresses, ",") + if len(redisAddresses) == 0 { + return nil, errors.New("redis host is empty") + } + + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{ + Password: redisConfig.Password, + Addrs: redisAddresses, + DB: 0, + }) + + if _, err := redisClient.Ping(ctx).Result(); err != nil { + return nil, err + } + + return redisClient, nil +} diff --git a/internal/pkg/redis/config.go b/internal/pkg/redis/config.go new file mode 100644 index 0000000..571c12a --- /dev/null +++ b/internal/pkg/redis/config.go @@ -0,0 +1,7 @@ +package redis + +type Config struct { + Addresses string `json:"addresses"` + Password string `json:"password"` + MasterName string `json:"master_name"` +} diff --git a/internal/pkg/repository/block_keeper.go b/internal/pkg/repository/block_keeper.go new file mode 100644 index 0000000..8dbab98 --- /dev/null +++ b/internal/pkg/repository/block_keeper.go @@ -0,0 +1,190 @@ +package repository + +import ( + "context" + "fmt" + "sort" + + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + + "github.com/ethereum/go-ethereum/common" + ethereumTypes "github.com/ethereum/go-ethereum/core/types" + + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/constant" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/queue" + "github.com/tokamak-network/tokamak-thanos-event-listener/pkg/log" +) + +const ( + TwoEpochBlocks = 64 + batchBlocksSize = uint64(10) +) + +type SyncBlockMetadataKeeper interface { + GetHead(ctx context.Context) (string, error) + SetHead(ctx context.Context, blockHash string) error +} + +type BlockChainSource interface { + HeaderAtBlockHash(ctx context.Context, blockHash common.Hash) (*ethereumTypes.Header, error) + GetBlocks(ctx context.Context, withLogs bool, fromBlock, toBlock uint64) ([]*types.NewBlock, error) + GetHeader(ctx context.Context) (*ethereumTypes.Header, error) +} + +type BlockKeeper struct { + bcSource BlockChainSource + syncBlockMetadataKeeper SyncBlockMetadataKeeper + head *ethereumTypes.Header + q *queue.CircularQueue[string] + blocks map[uint64]common.Hash +} + +func NewBlockKeeper(ctx context.Context, bcSource BlockChainSource, syncBlockMetadataKeeper SyncBlockMetadataKeeper) (*BlockKeeper, error) { + keeper := &BlockKeeper{ + bcSource: bcSource, + syncBlockMetadataKeeper: syncBlockMetadataKeeper, + head: nil, + blocks: make(map[uint64]common.Hash), + q: queue.NewCircularQueue[string](TwoEpochBlocks), + } + + currentBlockHash, err := syncBlockMetadataKeeper.GetHead(ctx) + if err != nil { + log.GetLogger().Errorw("Failed to get head", "err", err) + return nil, err + } + + var ( + head *ethereumTypes.Header + blockNo uint64 + ) + if currentBlockHash != "" { + head, err = bcSource.HeaderAtBlockHash(ctx, common.HexToHash(currentBlockHash)) + if err != nil { + log.GetLogger().Errorw("Failed to get head by block hash", "err", err, "hash", currentBlockHash) + return nil, err + } + blockNo = head.Number.Uint64() + keeper.head = head + } else { + currentHeader, err := bcSource.GetHeader(ctx) + if err != nil { + log.GetLogger().Errorw("Failed to get head", "err", err) + return nil, err + } + currentBlockHash = currentHeader.Hash().Hex() + blockNo = currentHeader.Number.Uint64() + + err = keeper.SetHead(ctx, currentHeader, constant.ZeroHash) + if err != nil { + log.GetLogger().Errorw("Failed to set head", "err", err) + return nil, err + } + } + + for i := uint64(blockNo) - TwoEpochBlocks + 1; i < blockNo; i = i + batchBlocksSize { + from := i + to := i + batchBlocksSize - 1 + if to > blockNo-1 { + to = blockNo - 1 + } + + blocks, err := bcSource.GetBlocks(ctx, false, from, to) + if err != nil { + return nil, err + } + + for _, block := range blocks { + keeper.enqueue(block.Header.Hash(), block.Header.Number.Uint64()) + } + } + + keeper.enqueue(common.HexToHash(currentBlockHash), blockNo) + + log.GetLogger().Infow("Queue info", "size", keeper.q.Size(), "is_full", keeper.q.IsFull()) + + return keeper, nil +} + +func (bk *BlockKeeper) Head(_ context.Context) (*ethereumTypes.Header, error) { + return bk.head, nil +} + +func (bk *BlockKeeper) SetHead(ctx context.Context, header *ethereumTypes.Header, removedBlockHash common.Hash) error { + log.GetLogger().Infow("Set head", "new", header.Hash(), "removed", removedBlockHash.Hex()) + bk.head = header + + if removedBlockHash.Cmp(constant.ZeroHash) != 0 { + bk.q.RemoveAndEnqueue(header.Hash().String(), removedBlockHash.String()) + } else { + bk.q.Enqueue(header.Hash().String()) + } + + err := bk.syncBlockMetadataKeeper.SetHead(ctx, header.Hash().String()) + if err != nil { + log.GetLogger().Errorw("Failed to set head", "err", err) + return nil + } + + return nil +} + +func (bk *BlockKeeper) Contains(header *ethereumTypes.Header) bool { + return bk.q.Contains(header.Hash().String()) +} + +func (bk *BlockKeeper) GetReorgHeaders(ctx context.Context, header *ethereumTypes.Header) ([]*ethereumTypes.Header, []common.Hash, error) { + if bk.head == nil { + return nil, nil, nil + } + + if header.ParentHash.Cmp(constant.ZeroHash) == 0 { + return nil, nil, nil + } + + if header.ParentHash.Cmp(bk.head.Hash()) == 0 { + return nil, nil, nil + } + + parentHash := header.ParentHash + newHeaders := make([]*ethereumTypes.Header, 0) + removedBlockHashes := make([]common.Hash, 0) + + for { + if bk.q.Contains(parentHash.Hex()) { + sort.Slice(newHeaders, func(i, j int) bool { + blockI := newHeaders[i] + blockJ := newHeaders[j] + + return blockI.Number.Cmp(blockJ.Number) < 0 + }) + + return newHeaders, removedBlockHashes, nil + } + block, err := bk.bcSource.HeaderAtBlockHash(ctx, parentHash) + if err != nil { + log.GetLogger().Errorw("Failed to get head by block hash", "err", err) + return nil, nil, err + } + if block == nil { + return nil, nil, fmt.Errorf("block not found: %v", parentHash) + } + + newHeaders = append(newHeaders, block) + + blockNo := block.Number.Uint64() + + if removedBlockHash, ok := bk.blocks[blockNo]; ok { + removedBlockHashes = append(removedBlockHashes, removedBlockHash) + } else { + removedBlockHashes = append(removedBlockHashes, constant.ZeroHash) + } + + parentHash = block.ParentHash + } +} + +func (bk *BlockKeeper) enqueue(blockHash common.Hash, blockNumber uint64) { + bk.q.Enqueue(blockHash.String()) + bk.blocks[blockNumber] = blockHash +} diff --git a/internal/pkg/repository/sync_block_metadata.go b/internal/pkg/repository/sync_block_metadata.go new file mode 100644 index 0000000..8cdc90f --- /dev/null +++ b/internal/pkg/repository/sync_block_metadata.go @@ -0,0 +1,48 @@ +package repository + +import ( + "context" + "errors" + "fmt" + + "github.com/go-redis/redis/v8" +) + +const ( + syncBlockMetadataKey = "syncBlockMetadata" +) + +type SyncBlockMetadataRepository struct { + prefix string + redisClient redis.UniversalClient +} + +func NewSyncBlockMetadataRepository(prefix string, redisClient redis.UniversalClient) *SyncBlockMetadataRepository { + return &SyncBlockMetadataRepository{ + redisClient: redisClient, + prefix: prefix, + } +} +func (r *SyncBlockMetadataRepository) GetHead(ctx context.Context) (string, error) { + result, err := r.redisClient.Get(ctx, r.getKey()).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return "", nil + } + return "", err + } + + return result, nil +} + +func (r *SyncBlockMetadataRepository) SetHead(ctx context.Context, blockHash string) error { + err := r.redisClient.Set(ctx, r.getKey(), blockHash, -1).Err() + if err != nil { + return err + } + return nil +} + +func (r *SyncBlockMetadataRepository) getKey() string { + return fmt.Sprintf("%s:%s", r.prefix, syncBlockMetadataKey) +} diff --git a/internal/pkg/types/newblock.go b/internal/pkg/types/newblock.go new file mode 100644 index 0000000..8a68e79 --- /dev/null +++ b/internal/pkg/types/newblock.go @@ -0,0 +1,12 @@ +package types + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type NewBlock struct { + Header *types.Header + Logs []types.Log + ReorgedBlockHash common.Hash +} diff --git a/internal/pkg/types/token.go b/internal/pkg/types/token.go new file mode 100644 index 0000000..7a2efb0 --- /dev/null +++ b/internal/pkg/types/token.go @@ -0,0 +1,7 @@ +package types + +type Token struct { + Symbol string `json:"symbol"` + Decimals int `json:"decimals"` + Address string `json:"address"` +}