Skip to content

Commit

Permalink
feat: refactor aggregator rpc and upgrade eigenlayer-sdk (#86)
Browse files Browse the repository at this point in the history
* upgrade eigensdk for fix not collection sucess

* refactor rpc server, move to legacy and make exit wait
  • Loading branch information
fyInALT authored Apr 4, 2024
1 parent dc373c5 commit d7c0eb0
Show file tree
Hide file tree
Showing 8 changed files with 497 additions and 304 deletions.
162 changes: 40 additions & 122 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,10 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/common"

"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys"
sdktypes "github.com/Layr-Labs/eigensdk-go/types"

"github.com/alt-research/avs/aggregator/types"
"github.com/alt-research/avs/aggregator/rpc"
"github.com/alt-research/avs/core"
"github.com/alt-research/avs/core/chainio"
"github.com/alt-research/avs/core/config"
Expand Down Expand Up @@ -82,90 +77,77 @@ type OperatorStatus struct {
// Upon sending a task onchain (or receiving a NewTaskCreated Event if the tasks were sent by an external task generator), the aggregator can get the list of all operators opted into each quorum at that
// block number by calling the getOperatorState() function of the BLSOperatorStateRetriever.sol contract.
type Aggregator struct {
logger logging.Logger
logger logging.Logger

serverIpPortAddr string
avsWriter chainio.AvsWriterer
avsReader chainio.AvsReaderer
ethClient eth.Client
// aggregation related fields
blsAggregationService blsagg.BlsAggregationService
tasks map[types.TaskIndex]*message.AlertTaskInfo
tasksMu sync.RWMutex
finishedTasks map[[32]byte]*FinishedTaskStatus
finishedTasksMu sync.RWMutex
nextTaskIndex types.TaskIndex
nextTaskIndexMu sync.RWMutex

cfg *config.Config
operatorStatus map[common.Address]*OperatorStatus
operatorStatusMu sync.RWMutex
service *AggregatorService
legacyRpc *rpc.LegacyRpcHandler
}

// NewAggregator creates a new Aggregator with the provided config.
func NewAggregator(c *config.Config) (*Aggregator, error) {

avsReader, err := chainio.BuildAvsReaderFromConfig(c)
if err != nil {
c.Logger.Error("Cannot create avsReader", "err", err)
return nil, err
}

avsWriter, err := chainio.BuildAvsWriterFromConfig(c)
if err != nil {
c.Logger.Errorf("Cannot create avsWriter", "err", err)
return nil, err
}

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: c.EthHttpRpcUrl,
EthWsUrl: c.EthWsRpcUrl,
RegistryCoordinatorAddr: c.RegistryCoordinatorAddr.String(),
OperatorStateRetrieverAddr: c.OperatorStateRetrieverAddr.String(),
AvsName: avsName,
PromMetricsIpPortAddress: ":9090",
}
clients, err := clients.BuildAll(chainioConfig, c.PrivateKey, c.Logger)
service, err := NewAggregatorService(c)
if err != nil {
c.Logger.Errorf("Cannot create sdk clients", "err", err)
c.Logger.Errorf("Cannot create NewAggregatorService", "err", err)
return nil, err
}

operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(context.Background(), clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, c.Logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, c.Logger)
blsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, c.Logger)
legacyRpc := rpc.NewLegacyRpcHandler(c.Logger, service)

return &Aggregator{
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
avsWriter: avsWriter,
avsReader: avsReader,
ethClient: clients.EthHttpClient,
blsAggregationService: blsAggregationService,
tasks: make(map[types.TaskIndex]*message.AlertTaskInfo),
finishedTasks: make(map[[32]byte]*FinishedTaskStatus),
operatorStatus: make(map[common.Address]*OperatorStatus),
cfg: c,
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
avsWriter: avsWriter,
service: service,
legacyRpc: legacyRpc,
}, nil
}

func (agg *Aggregator) Start(ctx context.Context) error {
func (agg *Aggregator) Start(ctx context.Context, wg *sync.WaitGroup) error {
defer func() {
agg.wait()
wg.Done()
}()

agg.logger.Infof("Starting aggregator.")
agg.logger.Infof("Starting aggregator rpc server.")
go agg.startServer(ctx)

agg.logger.Infof("Aggregator set to send new task every 10 seconds...")
agg.startRpcServer(ctx)

agg.logger.Info("Aggregator set to send new task every 10 seconds...")
for {
select {
case <-ctx.Done():
return nil
case blsAggServiceResp := <-agg.blsAggregationService.GetResponseChannel():
case blsAggServiceResp := <-agg.service.GetResponseChannel():
agg.logger.Info("Received response from blsAggregationService", "blsAggServiceResp", blsAggServiceResp)
agg.sendAggregatedResponseToContract(blsAggServiceResp)
}
}
}

func (agg *Aggregator) startRpcServer(ctx context.Context) {
go agg.legacyRpc.StartServer(ctx, 1*time.Second, agg.serverIpPortAddr)
}

func (agg *Aggregator) wait() {
agg.logger.Info("The aggregator is wait to exit")

if agg.legacyRpc != nil {
agg.legacyRpc.Wait()
}

agg.logger.Info("The aggregator is exited")
}

func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
// TODO: check if blsAggServiceResp contains an err
if blsAggServiceResp.Err != nil {
Expand Down Expand Up @@ -195,83 +177,19 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
agg.logger.Info("Threshold reached. Sending aggregated response onchain.",
"taskIndex", blsAggServiceResp.TaskIndex,
)
agg.tasksMu.RLock()
task := agg.tasks[blsAggServiceResp.TaskIndex]
agg.tasksMu.RUnlock()

task := agg.service.GetTaskByIndex(blsAggServiceResp.TaskIndex)

res, err := agg.avsWriter.SendConfirmAlert(context.Background(), task, nonSignerStakesAndSignature)
if err != nil {
agg.logger.Error("Aggregator failed to respond to task", "err", err)
}

agg.finishedTasksMu.Lock()
defer agg.finishedTasksMu.Unlock()

agg.finishedTasks[task.AlertHash] = &FinishedTaskStatus{
agg.service.SetFinishedTask(task.AlertHash, &FinishedTaskStatus{
Message: task,
TxHash: res.TxHash,
BlockHash: res.BlockHash,
BlockNumber: res.BlockNumber,
TransactionIndex: res.TransactionIndex,
}
}

// sendNewTask sends a new task to the task manager contract, and updates the Task dict struct
// with the information of operators opted into quorum 0 at the block of task creation.
func (agg *Aggregator) sendNewTask(alertHash [32]byte, taskIndex types.TaskIndex) (*message.AlertTaskInfo, error) {
agg.logger.Info("Aggregator sending new task", "alert", alertHash, "task", taskIndex)

// TODO: use cfg
quorumNumbersValue := []sdktypes.QuorumNum{0}
quorumThresholdPercentagesValue := []sdktypes.QuorumThresholdPercentage{100}

var err error

var referenceBlockNumber uint64
if referenceBlockNumber, err = agg.ethClient.BlockNumber(context.Background()); err != nil {
return nil, err
}

// the reference block number must < the current block number.
referenceBlockNumber -= 1

agg.logger.Info("get from layer1", "referenceBlockNumber", referenceBlockNumber)

quorumNumbers, err := agg.avsReader.GetQuorumsByBlockNumber(context.Background(), uint32(referenceBlockNumber))
if err != nil {
agg.logger.Error("GetQuorumCountByBlockNumber failed", "err", err)
return nil, err
}

quorumThresholdPercentages, err := agg.avsReader.GetQuorumThresholdPercentages(context.Background(), uint32(referenceBlockNumber), quorumNumbers)
if err != nil {
agg.logger.Error("GetQuorumThresholdPercentages failed", "err", err)
return nil, err
}

agg.logger.Infof("quorum %v %v", quorumNumbers, quorumThresholdPercentages)

newAlertTask := &message.AlertTaskInfo{
AlertHash: alertHash,
QuorumNumbers: quorumNumbersValue,
QuorumThresholdPercentages: quorumThresholdPercentagesValue,
TaskIndex: taskIndex,
ReferenceBlockNumber: referenceBlockNumber,
}

agg.tasksMu.Lock()
agg.tasks[taskIndex] = newAlertTask
agg.tasksMu.Unlock()

// TODO(samlaf): we use seconds for now, but we should ideally pass a blocknumber to the blsAggregationService
// and it should monitor the chain and only expire the task aggregation once the chain has reached that block number.
taskTimeToExpiry := taskChallengeWindowBlock * blockTimeSeconds
agg.blsAggregationService.InitializeNewTask(
taskIndex,
uint32(newAlertTask.ReferenceBlockNumber),
newAlertTask.QuorumNumbers,
newAlertTask.QuorumThresholdPercentages,
taskTimeToExpiry,
)
return newAlertTask, nil
})
}
19 changes: 15 additions & 4 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"

"github.com/urfave/cli"

Expand Down Expand Up @@ -35,9 +38,9 @@ func main() {
}
}

func aggregatorMain(ctx *cli.Context) error {
func aggregatorMain(cliCtx *cli.Context) error {
log.Println("Initializing Aggregator")
config, err := config.NewConfig(ctx)
config, err := config.NewConfig(cliCtx)
if err != nil {
return err
}
Expand All @@ -52,10 +55,18 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

err = agg.Start(context.Background())
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

wg := &sync.WaitGroup{}
wg.Add(1)

err = agg.Start(ctx, wg)
if err != nil {
return err
log.Fatalln("Aggregator run failed", "err", err)
}

wg.Wait()

return nil
}
22 changes: 22 additions & 0 deletions aggregator/rpc/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package rpc

import (
"errors"

"github.com/alt-research/avs/core/message"
)

var (
TaskNotFoundError400 = errors.New("400. Task not found")
OperatorNotPartOfTaskQuorum400 = errors.New("400. Operator not part of quorum")
TaskResponseDigestNotFoundError500 = errors.New("500. Failed to get task response digest")
UnknownErrorWhileVerifyingSignature400 = errors.New("400. Failed to verify signature")
SignatureVerificationFailed400 = errors.New("400. Signature verification failed")
CallToGetCheckSignaturesIndicesFailed500 = errors.New("500. Failed to get check signatures indices")
)

type AggregatorRpcHandler interface {
InitOperator(req *message.InitOperatorRequest) (*message.InitOperatorResponse, error)
CreateTask(req *message.CreateTaskRequest) (*message.CreateTaskResponse, error)
ProcessSignedTaskResponse(signedTaskResponse *message.SignedTaskRespRequest) (*message.SignedTaskRespResponse, error)
}
Loading

0 comments on commit d7c0eb0

Please sign in to comment.