Skip to content

Commit

Permalink
feat: Add grpc aggregator support (#90)
Browse files Browse the repository at this point in the history
* delete useless codes

* add proto

* fix lint and bugs

* add ignore

* impl the rpc by grpc

* add operator grpc client impl
  • Loading branch information
fyInALT authored Apr 5, 2024
1 parent 927bb3f commit de9f2ac
Show file tree
Hide file tree
Showing 22 changed files with 1,595 additions and 138 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ coverage.html
logs.txt

bin
contracts/broadcast
contracts/broadcast
.vscode
20 changes: 20 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ CHAINID=31337
STRATEGY_ADDRESS=0x7a2088a1bFc9d81c55368AE168C2C02570cB814F
DEPLOYMENT_FILES_DIR=contracts/script/output/${CHAINID}

PROTOS := ./api/proto
PROTO_GEN := ./api/grpc

-----------------------------: ##

___CONTRACTS___: ##
Expand All @@ -25,6 +28,22 @@ bindings: ## generates contract bindings

__CLI__: ##

clean:
find $(PROTO_GEN) -name "*.pb.go" -type f | xargs rm -rf
mkdir -p $(PROTO_GEN)

protoc: clean
protoc -I $(PROTOS) \
--go_out=$(PROTO_GEN) \
--go_opt=paths=source_relative \
--go-grpc_out=$(PROTO_GEN) \
--go-grpc_opt=paths=source_relative \
$(PROTOS)/**/*.proto

lint:
staticcheck ./...
golangci-lint run

build: build-operator build-aggregator build-cli

build-operator:
Expand All @@ -40,3 +59,4 @@ _____HELPER_____: ##
mocks: ## generates mocks for tests
go install go.uber.org/mock/[email protected]
go generate ./...

35 changes: 27 additions & 8 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
// this hardcoded here because it's also hardcoded in the contracts, but should
// ideally be fetched from the contracts
taskChallengeWindowBlock = 100
blockTimeSeconds = 12 * time.Second
blockTimeDuration = 12 * time.Second
avsName = "mach"
)

Expand Down Expand Up @@ -79,11 +79,14 @@ type OperatorStatus struct {
type Aggregator struct {
logger logging.Logger

serverIpPortAddr string
avsWriter chainio.AvsWriterer
serverIpPortAddr string
grpcServerIpPortAddr string

avsWriter chainio.AvsWriterer

service *AggregatorService
legacyRpc *rpc.LegacyRpcHandler
gRpc *rpc.GRpcHandler
}

// NewAggregator creates a new Aggregator with the provided config.
Expand All @@ -102,12 +105,20 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {

legacyRpc := rpc.NewLegacyRpcHandler(c.Logger, service)

var grpc_server *rpc.GRpcHandler
if c.AggregatorGRPCServerIpPortAddr != "" {
c.Logger.Infof("Create grpc server in %s", c.AggregatorGRPCServerIpPortAddr)
grpc_server = rpc.NewGRpcHandler(c.Logger, service)
}

return &Aggregator{
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
avsWriter: avsWriter,
service: service,
legacyRpc: legacyRpc,
logger: c.Logger,
serverIpPortAddr: c.AggregatorServerIpPortAddr,
grpcServerIpPortAddr: c.AggregatorGRPCServerIpPortAddr,
avsWriter: avsWriter,
service: service,
legacyRpc: legacyRpc,
gRpc: grpc_server,
}, nil
}

Expand Down Expand Up @@ -136,6 +147,10 @@ func (agg *Aggregator) Start(ctx context.Context, wg *sync.WaitGroup) error {

func (agg *Aggregator) startRpcServer(ctx context.Context) {
go agg.legacyRpc.StartServer(ctx, 1*time.Second, agg.serverIpPortAddr)

if agg.gRpc != nil {
go agg.gRpc.StartServer(ctx, agg.grpcServerIpPortAddr)
}
}

func (agg *Aggregator) wait() {
Expand All @@ -145,6 +160,10 @@ func (agg *Aggregator) wait() {
agg.legacyRpc.Wait()
}

if agg.gRpc != nil {
agg.gRpc.Wait()
}

agg.logger.Info("The aggregator is exited")
}

Expand Down
110 changes: 110 additions & 0 deletions aggregator/rpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package rpc

import (
"context"
"fmt"
"net"
"sync"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/alt-research/avs/api/grpc/aggregator"
"github.com/alt-research/avs/core/message"
"google.golang.org/grpc"
)

type GRpcHandler struct {
aggregator.UnimplementedAggregatorServer
logger logging.Logger
aggreagtor AggregatorRpcHandler
wg *sync.WaitGroup
}

func NewGRpcHandler(logger logging.Logger, aggreagtor AggregatorRpcHandler) *GRpcHandler {
return &GRpcHandler{
logger: logger,
aggreagtor: aggreagtor,
wg: &sync.WaitGroup{},
}
}

func (s *GRpcHandler) StartServer(ctx context.Context, serverIpPortAddr string) {
s.logger.Info("Start GRpcServer", "addr", serverIpPortAddr)

lis, err := net.Listen("tcp", serverIpPortAddr)
if err != nil {
s.logger.Fatalf("GRpcServer failed to listen: %v", err)
}

server := grpc.NewServer()
aggregator.RegisterAggregatorServer(server, s)

serverErr := make(chan error, 1)

s.wg.Add(1)
go func() {
defer s.wg.Done()
serverErr <- server.Serve(lis)
}()

select {
case <-ctx.Done():
s.logger.Info("Stop GRpcServer by Done")
server.Stop()
case err = <-serverErr:
}

if err != nil {
s.logger.Error("GRpcServer serve stopped by error", "err", err)
} else {
s.logger.Info("GRpcServer serve stopped")
}
}

func (s *GRpcHandler) Wait() {
s.wg.Wait()
}

// Send Init operator to aggregator from operator, will check if the config is matching
func (s *GRpcHandler) InitOperator(ctx context.Context, req *aggregator.InitOperatorRequest) (*aggregator.InitOperatorResponse, error) {
msg, err := message.NewInitOperatorRequest(req)
if err != nil {
return nil, fmt.Errorf("initOperator message convert error: %v", err.Error())
}

resp, err := s.aggreagtor.InitOperator(msg)
if err != nil {
return nil, fmt.Errorf("initOperator handler error: %v", err.Error())
}

return resp.ToPbType(), nil
}

// Create a alert task
func (s *GRpcHandler) CreateTask(ctx context.Context, req *aggregator.CreateTaskRequest) (*aggregator.CreateTaskResponse, error) {
msg, err := message.NewCreateTaskRequest(req)
if err != nil {
return nil, fmt.Errorf("createTask message convert error: %v", err.Error())
}

resp, err := s.aggreagtor.CreateTask(msg)
if err != nil {
return nil, fmt.Errorf("createTask handler error: %v", err.Error())
}

return resp.ToPbType(), nil
}

// Send signed task for alert
func (s *GRpcHandler) ProcessSignedTaskResponse(ctx context.Context, req *aggregator.SignedTaskRespRequest) (*aggregator.SignedTaskRespResponse, error) {
msg, err := message.NewSignedTaskRespRequest(req)
if err != nil {
return nil, fmt.Errorf("processSignedTaskResponse message convert error: %v", err.Error())
}

resp, err := s.aggreagtor.ProcessSignedTaskResponse(msg)
if err != nil {
return nil, fmt.Errorf("processSignedTaskResponse handler error: %v", err.Error())
}

return resp.ToPbType(), nil
}
11 changes: 0 additions & 11 deletions aggregator/rpc/handler.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
package rpc

import (
"errors"

"github.com/alt-research/avs/core/message"
)

var (
TaskNotFoundError400 = errors.New("400. Task not found")
OperatorNotPartOfTaskQuorum400 = errors.New("400. Operator not part of quorum")
TaskResponseDigestNotFoundError500 = errors.New("500. Failed to get task response digest")
UnknownErrorWhileVerifyingSignature400 = errors.New("400. Failed to verify signature")
SignatureVerificationFailed400 = errors.New("400. Signature verification failed")
CallToGetCheckSignaturesIndicesFailed500 = errors.New("500. Failed to get check signatures indices")
)

type AggregatorRpcHandler interface {
InitOperator(req *message.InitOperatorRequest) (*message.InitOperatorResponse, error)
CreateTask(req *message.CreateTaskRequest) (*message.CreateTaskResponse, error)
Expand Down
30 changes: 12 additions & 18 deletions aggregator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package aggregator

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys"
sdktypes "github.com/Layr-Labs/eigensdk-go/types"

"github.com/alt-research/avs/aggregator/rpc"
"github.com/alt-research/avs/aggregator/types"
"github.com/alt-research/avs/core/chainio"
Expand All @@ -23,15 +22,6 @@ import (
"github.com/ethereum/go-ethereum/common"
)

var (
TaskNotFoundError400 = errors.New("400. Task not found")
OperatorNotPartOfTaskQuorum400 = errors.New("400. Operator not part of quorum")
TaskResponseDigestNotFoundError500 = errors.New("500. Failed to get task response digest")
UnknownErrorWhileVerifyingSignature400 = errors.New("400. Failed to verify signature")
SignatureVerificationFailed400 = errors.New("400. Signature verification failed")
CallToGetCheckSignaturesIndicesFailed500 = errors.New("500. Failed to get check signatures indices")
)

type AggregatorService struct {
logger logging.Logger
cfg *config.Config
Expand Down Expand Up @@ -66,7 +56,7 @@ func NewAggregatorService(c *config.Config) (*AggregatorService, error) {
AvsName: avsName,
PromMetricsIpPortAddress: ":9090",
}
clients, err := clients.BuildAll(chainioConfig, c.PrivateKey, c.Logger)
clients, err := sdkclients.BuildAll(chainioConfig, c.PrivateKey, c.Logger)
if err != nil {
c.Logger.Errorf("Cannot create sdk clients", "err", err)
return nil, err
Expand Down Expand Up @@ -107,14 +97,14 @@ func (agg *AggregatorService) GetTaskByIndex(taskIndex types.TaskIndex) *message
agg.tasksMu.RLock()
defer agg.tasksMu.RUnlock()

res, _ := agg.tasks[taskIndex]
res := agg.tasks[taskIndex]

return res
}

func (agg *AggregatorService) newIndex() types.TaskIndex {
agg.tasksMu.Lock()
defer agg.tasksMu.Unlock()
agg.nextTaskIndexMu.Lock()
defer agg.nextTaskIndexMu.Unlock()

res := agg.nextTaskIndex
agg.nextTaskIndex += 1
Expand Down Expand Up @@ -188,7 +178,7 @@ func (agg *AggregatorService) CreateTask(req *message.CreateTaskRequest) (*messa

finished := agg.GetFinishedTaskByAlertHash(req.AlertHash)
if finished != nil {
return nil, fmt.Errorf("The task 0x%x already finished: 0x%x", req.AlertHash, finished.TxHash)
return nil, fmt.Errorf("the task 0x%x already finished: 0x%x", req.AlertHash, finished.TxHash)
}

task := agg.GetTaskByAlertHash(req.AlertHash)
Expand Down Expand Up @@ -295,15 +285,19 @@ func (agg *AggregatorService) sendNewTask(alertHash [32]byte, taskIndex types.Ta

// TODO(samlaf): we use seconds for now, but we should ideally pass a blocknumber to the blsAggregationService
// and it should monitor the chain and only expire the task aggregation once the chain has reached that block number.
taskTimeToExpiry := taskChallengeWindowBlock * blockTimeSeconds
taskTimeToExpiry := taskChallengeWindowBlock * blockTimeDuration

agg.logger.Infof("InitializeNewTask %v %v", taskIndex, taskTimeToExpiry)
agg.blsAggregationService.InitializeNewTask(
err = agg.blsAggregationService.InitializeNewTask(
taskIndex,
uint32(newAlertTask.ReferenceBlockNumber),
newAlertTask.QuorumNumbers,
newAlertTask.QuorumThresholdPercentages,
taskTimeToExpiry,
)
if err != nil {
agg.logger.Error("InitializeNewTask failed", "err", err)
return nil, err
}
return newAlertTask, nil
}
Loading

0 comments on commit de9f2ac

Please sign in to comment.