Skip to content

Commit

Permalink
parse filters and filter results
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Feb 13, 2025
1 parent 7377a21 commit 463af83
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 12 deletions.
5 changes: 4 additions & 1 deletion cmd/debugger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ func main() {
GenesisBlockNumber: cfg.GetGenesisBlockNumber(),
}, cfg, mds, p, sm, msm, rc, rcq, rps, l, client)

rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{
rpc, err := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{
GrpcPort: cfg.RpcConfig.GrpcPort,
HttpPort: cfg.RpcConfig.HttpPort,
}, mds, rc, rcq, eb, rps, pds, rds, scc, l, cfg)
if err != nil {
l.Sugar().Fatalw("Failed to create rpc server", zap.Error(err))
}

// RPC channel to notify the RPC server to shutdown gracefully
rpcChannel := make(chan bool)
Expand Down
6 changes: 5 additions & 1 deletion cmd/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,15 @@ var rpcCmd = &cobra.Command{
l.Sugar().Fatalw("Failed to create sidecar client", zap.Error(err))
}

rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{
rpc, err := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{
GrpcPort: cfg.RpcConfig.GrpcPort,
HttpPort: cfg.RpcConfig.HttpPort,
}, mds, rc, rcq, eb, rps, pds, rds, scc, l, cfg)

if err != nil {
l.Sugar().Fatalw("Failed to create rpc server", zap.Error(err))
}

// RPC channel to notify the RPC server to shutdown gracefully
rpcChannel := make(chan bool)
if err := rpc.Start(ctx, rpcChannel); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ var runCmd = &cobra.Command{
GenesisBlockNumber: cfg.GetGenesisBlockNumber(),
}, cfg, mds, p, sm, msm, rc, rcq, rps, l, client)

rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{
rpc, err := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{
GrpcPort: cfg.RpcConfig.GrpcPort,
HttpPort: cfg.RpcConfig.HttpPort,
}, mds, rc, rcq, eb, rps, pds, rds, scc, l, cfg)
if err != nil {
l.Sugar().Fatalw("Failed to create rpc server", zap.Error(err))
}

// RPC channel to notify the RPC server to shutdown gracefully
rpcChannel := make(chan bool)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1
github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13
github.com/Layr-Labs/protocol-apis v1.7.0
github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213193904-02e6c9a33dbf
github.com/ethereum/go-ethereum v1.14.13
github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1
github.com/google/uuid v1.6.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 h1:Blb4AE+jC/vddV71w4/MQA
github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13/go.mod h1:PD/HoyzZjxDw1tAcZw3yD0yGddo+yhmwQAi+lk298r4=
github.com/Layr-Labs/protocol-apis v1.7.0 h1:BYzfF0mLjil3VAq8EEnswJbOL9v0OqfL32TipUryZ2s=
github.com/Layr-Labs/protocol-apis v1.7.0/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ=
github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213185609-9608503ad4ed h1:dWv+54pLMoDWTPiVWatfFvyv5tLlkyuK01KBAvW4ieY=
github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213185609-9608503ad4ed/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ=
github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213193904-02e6c9a33dbf h1:0ajrteTXg+E6jrkd9bPJkr8LvUTCpzqXUq7fFVPMGL4=
github.com/Layr-Labs/protocol-apis v1.7.1-0.20250213193904-02e6c9a33dbf/go.mod h1:zCirDItAbrnEv1kV1RTccY7eVSg0+da4/dFCXHyLNZQ=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
Expand Down
85 changes: 79 additions & 6 deletions pkg/rpcServer/eventHandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package rpcServer

import (
"context"
"encoding/json"
v1EigenState "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/eigenState"
v1EthereumTypes "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/ethereumTypes"
v1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/events"
"github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager"
"github.com/Layr-Labs/sidecar/pkg/eventBus/eventBusTypes"
"github.com/Layr-Labs/sidecar/pkg/eventFilter"
"github.com/Layr-Labs/sidecar/pkg/storage"
"github.com/google/uuid"
"go.uber.org/zap"
Expand All @@ -15,7 +17,11 @@ import (
"io"
)

func (rpc *RpcServer) subscribeToBlocks(ctx context.Context, requestId string, handleBlock func(interface{}) error) error {
func (rpc *RpcServer) subscribeToBlocks(
ctx context.Context,
requestId string,
handleBlock func(interface{}) error,
) error {
consumer := &eventBusTypes.Consumer{
Id: eventBusTypes.ConsumerId(requestId),
Context: ctx,
Expand All @@ -39,6 +45,22 @@ func (rpc *RpcServer) subscribeToBlocks(ctx context.Context, requestId string, h
}
}

func filterEigenStateChanges(changes map[string][]interface{}, filter *eventFilter.And, filterRegistry *eventFilter.FilterableRegistry) (map[string][]interface{}, error) {
for modelName, modelChanges := range changes {
for i, change := range modelChanges {
match, err := filter.Evaluate(change, filterRegistry)
if err != nil {
return nil, err
}
if !match {
// remove the item from the list
changes[modelName] = append(changes[modelName][:i], changes[modelName][i+1:]...)
}
}
}
return changes, nil
}

func (rpc *RpcServer) StreamEigenStateChanges(request *v1.StreamEigenStateChangesRequest, g grpc.ServerStreamingServer[v1.StreamEigenStateChangesResponse]) error {
// Since this rpc sidecar is not processing blocks, we need to connect to the primary sidecar to get the events
if !rpc.globalConfig.SidecarPrimaryConfig.IsPrimary {
Expand Down Expand Up @@ -68,8 +90,27 @@ func (rpc *RpcServer) StreamEigenStateChanges(request *v1.StreamEigenStateChange
return err
}

var filter *eventFilter.And
filterString := request.GetStateChangeFilter()
if filterString != "" {
err := json.Unmarshal([]byte(filterString), &filter)
if err != nil {
rpc.Logger.Sugar().Errorw("Failed to unmarshal filter",
zap.Error(err),
)
return err
}
}

err = rpc.subscribeToBlocks(g.Context(), requestId.String(), func(data interface{}) error {
blockProcessedData := data.(*eventBusTypes.BlockProcessedData)

if filter != nil {
blockProcessedData.CommittedState, err = filterEigenStateChanges(blockProcessedData.CommittedState, filter, rpc.filterRegistry)
if err != nil {
return err
}
}
changes, err := rpc.parseCommittedChanges(blockProcessedData.CommittedState)
if err != nil {
return err
Expand Down Expand Up @@ -112,23 +153,55 @@ func (rpc *RpcServer) StreamIndexedBlocks(request *v1.StreamIndexedBlocksRequest
}
onlyBlocksWithData := request.GetOnlyBlocksWithData()

var stateChangesFilter *eventFilter.And
var blockFilter *eventFilter.And

filters := request.GetFilters()
if filters != nil {
stateChangeFilterStr := filters.GetStateChangeFilter()
if stateChangeFilterStr != "" {
err := json.Unmarshal([]byte(stateChangeFilterStr), &stateChangesFilter)
if err != nil {
rpc.Logger.Sugar().Errorw("Failed to unmarshal state changes filter",
zap.Error(err),
)
return err
}
}
blockFilterStr := filters.GetBlockFilter()
if blockFilterStr != "" {
err := json.Unmarshal([]byte(blockFilterStr), &blockFilter)
if err != nil {
rpc.Logger.Sugar().Errorw("Failed to unmarshal block filter",
zap.Error(err),
)
return err
}
}
}

err = rpc.subscribeToBlocks(g.Context(), requestId.String(), func(data interface{}) error {
rpc.Logger.Debug("Received block", zap.Any("data", data))
blockProcessedData := data.(*eventBusTypes.BlockProcessedData)

if onlyBlocksWithData && processedBlockHasData(blockProcessedData) {
if (onlyBlocksWithData && processedBlockHasData(blockProcessedData)) || !onlyBlocksWithData {
if stateChangesFilter != nil {
blockProcessedData.CommittedState, err = filterEigenStateChanges(blockProcessedData.CommittedState, stateChangesFilter, rpc.filterRegistry)
if err != nil {
return err
}
}

resp, err := rpc.buildBlockResponse(blockProcessedData, request.GetIncludeStateChanges())
if err != nil {
return err
}

return g.SendMsg(resp)
}
return nil
})
if err != nil {
return err
}
return nil
return err
}

func processedBlockHasData(block *eventBusTypes.BlockProcessedData) bool {
Expand Down
14 changes: 12 additions & 2 deletions pkg/rpcServer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/Layr-Labs/sidecar/internal/logger"
sidecarClient "github.com/Layr-Labs/sidecar/pkg/clients/sidecar"
"github.com/Layr-Labs/sidecar/pkg/eventBus/eventBusTypes"
"github.com/Layr-Labs/sidecar/pkg/eventFilter"
"github.com/Layr-Labs/sidecar/pkg/eventFilter/eventTypeRegistry"
"github.com/Layr-Labs/sidecar/pkg/proofs"
"github.com/Layr-Labs/sidecar/pkg/rewards"
"github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue"
Expand Down Expand Up @@ -48,6 +50,7 @@ type RpcServer struct {
rewardsDataService *rewardsDataService.RewardsDataService
globalConfig *config.Config
sidecarClient *sidecarClient.SidecarClient
filterRegistry *eventFilter.FilterableRegistry
}

func NewRpcServer(
Expand All @@ -62,7 +65,7 @@ func NewRpcServer(
scc *sidecarClient.SidecarClient,
l *zap.Logger,
cfg *config.Config,
) *RpcServer {
) (*RpcServer, error) {
server := &RpcServer{
rpcConfig: config,
blockStore: bs,
Expand All @@ -77,7 +80,14 @@ func NewRpcServer(
sidecarClient: scc,
}

return server
reg, err := eventTypeRegistry.BuildFilterableEventRegistry()
if err != nil {
l.Sugar().Errorw("Failed to build filterable event registry", zap.Error(err))
return nil, err
}
server.filterRegistry = reg

return server, nil
}

func (s *RpcServer) registerHandlers(ctx context.Context, grpcServer *grpc.Server, mux *runtime.ServeMux) error {
Expand Down

0 comments on commit 463af83

Please sign in to comment.