diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index e1c5131..09c3ebb 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/Layr-Labs/eigensdk-go/crypto/bls" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/common" @@ -43,6 +44,11 @@ type FinishedTaskStatus struct { TransactionIndex uint `json:"transactionIndex"` } +type OperatorStatus struct { + LastTime int64 `json:"lastTime"` + OperatorId bls.OperatorId `json:"operatorId"` +} + // Aggregator sends tasks (numbers to square) onchain, then listens for operator signed TaskResponses. // It aggregates responses signatures, and if any of the TaskResponses reaches the QuorumThresholdPercentage for each quorum // (currently we only use a single quorum of the ERC20Mock token), it sends the aggregated TaskResponse and signature onchain. @@ -90,6 +96,10 @@ type Aggregator struct { finishedTasksMu sync.RWMutex nextTaskIndex types.TaskIndex nextTaskIndexMu sync.RWMutex + + cfg *config.Config + operatorStatus map[common.Address]*OperatorStatus + operatorStatusMu sync.RWMutex } // NewAggregator creates a new Aggregator with the provided config. @@ -134,6 +144,8 @@ func NewAggregator(c *config.Config) (*Aggregator, error) { blsAggregationService: blsAggregationService, tasks: make(map[types.TaskIndex]*message.AlertTaskInfo), finishedTasks: make(map[[32]byte]*FinishedTaskStatus), + operatorStatus: make(map[common.Address]*OperatorStatus), + cfg: c, }, nil } diff --git a/aggregator/cmd/main.go b/aggregator/cmd/main.go index 3fa5740..17dedbc 100644 --- a/aggregator/cmd/main.go +++ b/aggregator/cmd/main.go @@ -21,7 +21,6 @@ var ( ) func main() { - app := cli.NewApp() app.Flags = config.Flags app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate) @@ -37,7 +36,6 @@ func main() { } func aggregatorMain(ctx *cli.Context) error { - log.Println("Initializing Aggregator") config, err := config.NewConfig(ctx) if err != nil { @@ -60,5 +58,4 @@ func aggregatorMain(ctx *cli.Context) error { } return nil - } diff --git a/aggregator/rpc_server.go b/aggregator/rpc_server.go index 1e547b5..10fff2f 100644 --- a/aggregator/rpc_server.go +++ b/aggregator/rpc_server.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "net/rpc" + "time" "github.com/alt-research/avs/aggregator/types" "github.com/alt-research/avs/core/message" @@ -74,6 +75,46 @@ func (agg *Aggregator) GetFinishedTaskByAlertHash(alertHash [32]byte) *FinishedT return agg.finishedTasks[alertHash] } +// rpc endpoint which is called by operator +// will init operator, just for keep config valid +func (agg *Aggregator) InitOperator(req *message.InitOperatorRequest, reply *message.InitOperatorResponse) error { + agg.logger.Infof("Received InitOperator: %#v", req) + + reply.Ok = false + + if agg.cfg.OperatorStateRetrieverAddr != req.OperatorStateRetrieverAddr { + reply.Res = fmt.Sprintf("OperatorStateRetrieverAddr invaild, expect %s", agg.cfg.OperatorStateRetrieverAddr.Hex()) + return nil + } + + if agg.cfg.RegistryCoordinatorAddr != req.RegistryCoordinatorAddr { + reply.Res = fmt.Sprintf("RegistryCoordinatorAddr invaild, expect %s", agg.cfg.RegistryCoordinatorAddr.Hex()) + return nil + } + + if agg.cfg.Layer1ChainId != req.Layer1ChainId { + reply.Res = fmt.Sprintf("Layer1ChainId invaild, expect %d", agg.cfg.Layer1ChainId) + return nil + } + + if agg.cfg.Layer2ChainId != req.ChainId { + reply.Res = fmt.Sprintf("Layer2ChainId invaild, expect %d", agg.cfg.Layer2ChainId) + return nil + } + + agg.operatorStatusMu.Lock() + defer agg.operatorStatusMu.Unlock() + + agg.operatorStatus[req.OperatorAddress] = &OperatorStatus{ + LastTime: time.Now().Unix(), + OperatorId: req.OperatorId, + } + + agg.logger.Infof("new operator status: %s", req.OperatorAddress.Hex()) + + return nil +} + // rpc endpoint which is called by operator // will try to init the task, if currently had a same task for the alert, // it will return the existing task. diff --git a/config-files/aggregator.yaml b/config-files/aggregator.yaml index 74adba2..dd7d56b 100644 --- a/config-files/aggregator.yaml +++ b/config-files/aggregator.yaml @@ -2,5 +2,12 @@ environment: production eth_rpc_url: http://localhost:8545 eth_ws_url: ws://localhost:8545 + # address which the aggregator listens on for operator signed messages aggregator_server_ip_port_address: localhost:8090 + +# the layer1 chain id the avs contracts in +layer1_chain_id: 31337 + +# the layer2 chain id +layer2_chain_id: 20240219 diff --git a/core/config/avs_config.go b/core/config/avs_config.go index 13ba692..59d53e3 100644 --- a/core/config/avs_config.go +++ b/core/config/avs_config.go @@ -17,4 +17,6 @@ type NodeConfig struct { OperatorServerIpPortAddr string `yaml:"operator_server_ip_port_addr"` MetadataURI string `yaml:"metadata_uri"` OperatorSocket string `yaml:"operator_socket"` + Layer1ChainId uint32 `yaml:"layer1_chain_id"` + Layer2ChainId uint32 `yaml:"layer2_chain_id"` } diff --git a/core/config/config.go b/core/config/config.go index 5c17be1..07f2017 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "errors" + "fmt" "os" "github.com/ethereum/go-ethereum/common" @@ -36,6 +37,8 @@ type Config struct { OperatorStateRetrieverAddr common.Address RegistryCoordinatorAddr common.Address AggregatorServerIpPortAddr string + Layer1ChainId uint32 + Layer2ChainId uint32 // json:"-" skips this field when marshaling (only used for logging to stdout), since SignerFn doesnt implement marshalJson SignerFn signerv2.SignerFn `json:"-"` PrivateKey *ecdsa.PrivateKey `json:"-"` @@ -49,6 +52,8 @@ type ConfigRaw struct { EthRpcUrl string `yaml:"eth_rpc_url"` EthWsUrl string `yaml:"eth_ws_url"` AggregatorServerIpPortAddr string `yaml:"aggregator_server_ip_port_address"` + Layer1ChainId uint32 `yaml:"layer1_chain_id"` + Layer2ChainId uint32 `yaml:"layer2_chain_id"` } // These are read from DeploymentFileFlag @@ -116,6 +121,17 @@ func NewConfig(ctx *cli.Context) (*Config, error) { return nil, err } + layer1ChainIdFromRpc, err := ethRpcClient.ChainID(context.Background()) + if err != nil { + logger.Errorf("Cannot got chain id from eth rpc client", "err", err) + return nil, err + } + + if layer1ChainIdFromRpc.Uint64() != uint64(configRaw.Layer1ChainId) { + logger.Errorf("The layer1 chain id not expect", "layer1 rpc", layer1ChainIdFromRpc, "config", configRaw.Layer1ChainId) + return nil, fmt.Errorf("layer1 chain id not expect") + } + ethWsClient, err := eth.NewClient(configRaw.EthWsUrl) if err != nil { logger.Errorf("Cannot create ws ethclient", "err", err) @@ -168,6 +184,8 @@ func NewConfig(ctx *cli.Context) (*Config, error) { PrivateKey: ecdsaPrivateKey, TxMgr: txMgr, AggregatorAddress: aggregatorAddr, + Layer1ChainId: configRaw.Layer1ChainId, + Layer2ChainId: configRaw.Layer2ChainId, } config.validate() return config, nil diff --git a/core/message/task.go b/core/message/task.go index 2173826..6fa22a9 100644 --- a/core/message/task.go +++ b/core/message/task.go @@ -3,6 +3,7 @@ package message import ( csservicemanager "github.com/alt-research/avs/contracts/bindings/MachServiceManager" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" "golang.org/x/crypto/sha3" "github.com/Layr-Labs/eigensdk-go/crypto/bls" @@ -92,6 +93,22 @@ func (a AlertTaskInfo) ToIMachServiceManagerAlertHeader() csservicemanager.IMach } } +// The init operator request +type InitOperatorRequest struct { + Layer1ChainId uint32 + ChainId uint32 + OperatorId bls.OperatorId + OperatorAddress common.Address + OperatorStateRetrieverAddr common.Address + RegistryCoordinatorAddr common.Address +} + +// The init operator response +type InitOperatorResponse struct { + Ok bool + Res string +} + // The Alert task create request type CreateTaskRequest struct { AlertHash [32]byte diff --git a/operator/operator.go b/operator/operator.go index a766080..b292d4d 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "strconv" "strings" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -157,6 +158,26 @@ func withEnvConfig(c config.NodeConfig) config.NodeConfig { c.OperatorSocket = operatorSocket } + layer1ChainId, ok := os.LookupEnv("LAYER1_CHAIN_ID") + if ok && layer1ChainId != "" { + layer1ChainId, err := strconv.Atoi(layer1ChainId) + if err != nil { + panic(fmt.Sprintf("layer1_chain_id parse error: %v", err)) + } + + c.Layer1ChainId = uint32(layer1ChainId) + } + + layer2ChainId, ok := os.LookupEnv("LAYER2_CHAIN_ID") + if ok && layer2ChainId != "" { + layer2ChainId, err := strconv.Atoi(layer2ChainId) + if err != nil { + panic(fmt.Sprintf("layer2_chain_id parse error: %v", err)) + } + + c.Layer2ChainId = uint32(layer2ChainId) + } + configJson, err := json.MarshalIndent(c, "", " ") if err != nil { panic(err) @@ -299,7 +320,14 @@ func NewOperatorFromConfig(cfg config.NodeConfig) (*Operator, error) { AVS_NAME, logger, operatorAddress, quorumNames) reg.MustRegister(economicMetricsCollector) - aggregatorRpcClient, err := NewAggregatorRpcClient(c.AggregatorServerIpPortAddress, logger, avsAndEigenMetrics) + // OperatorId is set in contract during registration so we get it after registering operator. + operatorId, err := sdkClients.AvsRegistryChainReader.GetOperatorId(&bind.CallOpts{}, operatorAddress) + if err != nil { + logger.Error("Cannot get operator id", "err", err) + return nil, err + } + + aggregatorRpcClient, err := NewAggregatorRpcClient(c, operatorId, operatorAddress, logger, avsAndEigenMetrics) if err != nil { logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err) return nil, err @@ -331,17 +359,9 @@ func NewOperatorFromConfig(cfg config.NodeConfig) (*Operator, error) { newTaskCreatedChan: newTaskCreatedChan, serviceManagerAddr: common.HexToAddress(c.AVSRegistryCoordinatorAddress), metadataURI: c.MetadataURI, - operatorId: [32]byte{0}, // this is set below - + operatorId: operatorId, } - // OperatorId is set in contract during registration so we get it after registering operator. - operatorId, err := sdkClients.AvsRegistryChainReader.GetOperatorId(&bind.CallOpts{}, operator.operatorAddr) - if err != nil { - logger.Error("Cannot get operator id", "err", err) - return nil, err - } - operator.operatorId = operatorId logger.Info("Operator info", "operatorId", operatorId, "operatorAddr", operatorAddress, @@ -368,6 +388,14 @@ func (o *Operator) Start(ctx context.Context) error { o.logger.Infof("Starting operator.") + o.logger.Infof("Init operator to aggregator.") + err = o.aggregatorRpcClient.InitOperatorToAggregator() + if err != nil { + o.logger.Errorf("Init operator to aggregator failed: %v", err) + return err + } + o.logger.Infof("Init operator to aggregator succeeded.") + if o.config.EnableNodeApi { o.nodeApi.Start() } diff --git a/operator/rpc_client.go b/operator/rpc_client.go index 4c7d276..9ccb1b2 100644 --- a/operator/rpc_client.go +++ b/operator/rpc_client.go @@ -7,30 +7,44 @@ import ( "time" "github.com/alt-research/avs/core/alert" + "github.com/alt-research/avs/core/config" "github.com/alt-research/avs/core/message" "github.com/alt-research/avs/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/Layr-Labs/eigensdk-go/crypto/bls" "github.com/Layr-Labs/eigensdk-go/logging" ) type AggregatorRpcClienter interface { + InitOperatorToAggregator() error CreateAlertTaskToAggregator(alertHash [32]byte) (*message.AlertTaskInfo, error) SendSignedTaskResponseToAggregator(signedTaskResponse *message.SignedTaskRespRequest, resChan chan alert.AlertResponse) } type AggregatorRpcClient struct { - rpcClient *rpc.Client - metrics metrics.Metrics - logger logging.Logger - aggregatorIpPortAddr string + rpcClient *rpc.Client + metrics metrics.Metrics + logger logging.Logger + config config.NodeConfig + operatorId bls.OperatorId + operatorAddr common.Address + OperatorStateRetrieverAddr common.Address + RegistryCoordinatorAddr common.Address + aggregatorIpPortAddr string } -func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger, metrics metrics.Metrics) (*AggregatorRpcClient, error) { +func NewAggregatorRpcClient(config config.NodeConfig, operatorId bls.OperatorId, operatorAddr common.Address, logger logging.Logger, metrics metrics.Metrics) (*AggregatorRpcClient, error) { return &AggregatorRpcClient{ // set to nil so that we can create an rpc client even if the aggregator is not running - rpcClient: nil, - metrics: metrics, - logger: logger, - aggregatorIpPortAddr: aggregatorIpPortAddr, + rpcClient: nil, + metrics: metrics, + logger: logger, + config: config, + operatorId: operatorId, + operatorAddr: operatorAddr, + OperatorStateRetrieverAddr: common.HexToAddress(config.OperatorStateRetrieverAddress), + RegistryCoordinatorAddr: common.HexToAddress(config.AVSRegistryCoordinatorAddress), + aggregatorIpPortAddr: config.AggregatorServerIpPortAddress, }, nil } @@ -43,6 +57,58 @@ func (c *AggregatorRpcClient) dialAggregatorRpcClient() error { return nil } +// CreateAlertTaskToAggregator create a new alert task, if had existing, just return current alert task. +func (c *AggregatorRpcClient) InitOperatorToAggregator() error { + if c.rpcClient == nil { + c.logger.Info("rpc client is nil. Dialing aggregator rpc client") + err := c.dialAggregatorRpcClient() + if err != nil { + c.logger.Error("Could not dial aggregator rpc client. Not sending signed task response header to aggregator. Is aggregator running?", "err", err) + return err + } + } + // we don't check this bool. It's just needed because rpc.Call requires rpc methods to have a return value + var reply message.InitOperatorResponse + req := message.InitOperatorRequest{ + Layer1ChainId: c.config.Layer1ChainId, + ChainId: c.config.Layer2ChainId, + OperatorId: c.operatorId, + OperatorAddress: c.operatorAddr, + OperatorStateRetrieverAddr: c.OperatorStateRetrieverAddr, + RegistryCoordinatorAddr: c.RegistryCoordinatorAddr, + } + + c.logger.Info("Create task header to aggregator", "req", fmt.Sprintf("%#v", req)) + + for i := 0; i < 5; i++ { + err := c.rpcClient.Call("Aggregator.CreateTask", req, &reply) + if err != nil { + c.logger.Info("Received error from aggregator", "err", err) + if strings.Contains(err.Error(), "already finished") { + return err + } + } else { + c.logger.Info("init operator accepted by aggregator.", "reply", reply) + c.metrics.IncNumTasksAcceptedByAggregator() + + if !reply.Ok { + if reply.Res != "" { + return fmt.Errorf("init operator failed by %s", reply.Res) + } else { + return fmt.Errorf("init operator failed by unknown") + } + } + + return nil + } + c.logger.Infof("Retrying in 2 seconds") + time.Sleep(2 * time.Second) + } + c.logger.Errorf("Could not send init operator to aggregator. Tried 5 times.") + + return fmt.Errorf("Could not send init operator to aggregator") +} + // CreateAlertTaskToAggregator create a new alert task, if had existing, just return current alert task. func (c *AggregatorRpcClient) CreateAlertTaskToAggregator(alertHash [32]byte) (*message.AlertTaskInfo, error) { if c.rpcClient == nil { diff --git a/ops/configs/aggregator-docker-compose.yaml b/ops/configs/aggregator-docker-compose.yaml index b6101e0..fa97998 100644 --- a/ops/configs/aggregator-docker-compose.yaml +++ b/ops/configs/aggregator-docker-compose.yaml @@ -2,5 +2,12 @@ environment: production eth_rpc_url: http://anvil:8545 eth_ws_url: ws://anvil:8545 + # address which the aggregator listens on for operator signed messages aggregator_server_ip_port_address: 0.0.0.0:8090 + +# the layer1 chain id the avs contracts in +layer1_chain_id: 31337 + +# the layer2 chain id +layer2_chain_id: 20240219