Skip to content

Commit

Permalink
execution factory constructor updated to take two providers, chainIDs…
Browse files Browse the repository at this point in the history
…, and source token address (#641)

* execution factory constructor updated to take two providers and chain IDs

(cherry picked from commit 6ad1f08)

* Adding source token address to execution factory constructor

(cherry picked from commit 96611a2)
  • Loading branch information
patrickhuie19 committed Sep 25, 2024
1 parent e407ecd commit eac77b0
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 269 deletions.
4 changes: 2 additions & 2 deletions pkg/loop/ccip_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ type ExecutionFactoryService struct {

// NewExecutionService returns a new [*ExecutionFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.CCIPExecProvider) *ExecutionFactoryService {
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChain uint32, dstChain uint32, sourceTokenAddress string) *ExecutionFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
plug, ok := instance.(types.CCIPExecutionFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected CCIPExecutionFactoryGenerator but got %T", instance)
}
return plug.NewExecutionFactory(ctx, provider)
return plug.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(srcChain), int64(dstChain), sourceTokenAddress)
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPExecutionService")
Expand Down
12 changes: 9 additions & 3 deletions pkg/loop/ccip_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestExecService(t *testing.T) {

exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPExecutionLOOPName, false, 0)
}, cciptest.ExecutionProvider)
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
hook := exec.PluginService.XXXTestHook()
servicetest.Run(t, exec)

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestExecService_recovery(t *testing.T) {
Limit: int(limit.Add(1)),
}
return h.New()
}, cciptest.ExecutionProvider)
}, cciptest.ExecutionProvider, cciptest.ExecutionProvider, 0, 0, "")
servicetest.Run(t, exec)

reportingplugintest.RunFactory(t, exec)
Expand Down Expand Up @@ -99,7 +99,13 @@ func TestExecLOOP(t *testing.T) {
assert.Contains(t, err.Error(), "BCF-3061")
if err == nil {
// test to run when BCF-3061 is fixed
cciptest.ExecutionLOOPTester{CCIPExecProvider: remoteProvider}.Run(t, remoteExecFactory)
cciptest.ExecutionLOOPTester{
SrcProvider: remoteProvider,
DstProvider: remoteProvider,
SrcChainID: 0,
DstChainID: 0,
SourceTokenAddress: "",
}.Run(t, remoteExecFactory)
}
})
}
Expand Down
526 changes: 284 additions & 242 deletions pkg/loop/internal/pb/ccip/factories.pb.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/loop/internal/pb/ccip/factories.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ service ExecutionCustomHandlers {

// NewExecutionFactoryRequest is a gRPC adapter to the factory configuration [https://github.com/smartcontractkit/ccip/core/services/ocr2/plugins/ccip/ccipexec/ExecutionPluginStaticConfig]
message NewExecutionFactoryRequest {
uint32 provider_service_id = 1;
uint32 src_provider_service_id = 1;
uint32 dst_provider_service_id = 2;
uint32 src_chain = 3;
uint32 dst_chain = 4;
string src_token_address = 5;
}

// NewExecutionFactoryResponse is a contains the id of the created execution factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,35 @@ type execFactoryServer struct {
}

// NewExecutionFactory implements types.CCIPExecFactoryGenerator.
func (e execFactoryServer) NewExecutionFactory(ctx context.Context, provider types.CCIPExecProvider) (types.ReportingPluginFactory, error) {
err := e.provider.Evaluate(ctx, provider)
func (e execFactoryServer) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, sourceTokenAddress string) (types.ReportingPluginFactory, error) {
err := e.provider.Evaluate(ctx, srcProvider)
if err != nil {
return nil, err
}

err2 := e.provider.Evaluate(ctx, dstProvider)
if err2 != nil {
return nil, err2
}
return reportingplugintest.Factory, nil
}

func RunExecutionLOOP(t *testing.T, p types.CCIPExecutionFactoryGenerator) {
ExecutionLOOPTester{ExecutionProvider}.Run(t, p)
ExecutionLOOPTester{SrcProvider: ExecutionProvider, DstProvider: ExecutionProvider, SrcChainID: 0, DstChainID: 0}.Run(t, p)
}

type ExecutionLOOPTester struct {
types.CCIPExecProvider
SrcProvider types.CCIPExecProvider
DstProvider types.CCIPExecProvider
SrcChainID int64
DstChainID int64
SourceTokenAddress string
}

func (e ExecutionLOOPTester) Run(t *testing.T, p types.CCIPExecutionFactoryGenerator) {
t.Run("ExecutionLOOP", func(t *testing.T) {
ctx := tests.Context(t)
factory, err := p.NewExecutionFactory(ctx, e.CCIPExecProvider)
factory, err := p.NewExecutionFactory(ctx, e.SrcProvider, e.DstProvider, e.SrcChainID, e.DstChainID, e.SourceTokenAddress)
require.NoError(t, err)

runExecReportingPluginFactory(t, factory)
Expand Down
60 changes: 45 additions & 15 deletions pkg/loop/internal/reportingplugin/ccip/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,55 @@ func NewExecutionLOOPClient(broker net.Broker, brokerCfg net.BrokerConfig, conn
// is run as an external process via hashicorp plugin. If the given provider is a GRPCClientConn, then the provider is proxied to the
// to the relayer, which is its own process via hashicorp plugin. If the provider is not a GRPCClientConn, then the provider is a local
// to the core node. The core must wrap the provider in a grpc server and serve it locally.
func (c *ExecutionLOOPClient) NewExecutionFactory(ctx context.Context, provider types.CCIPExecProvider) (types.ReportingPluginFactory, error) {
func (c *ExecutionLOOPClient) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, srcTokenAddress string) (types.ReportingPluginFactory, error) {
newExecClientFn := func(ctx context.Context) (id uint32, deps net.Resources, err error) {
// TODO are there any local resources that need to be passed to the executor and started as a server?

// the proxyable resources are the Provider, which may or may not be local to the client process. (legacy vs loopp)
var (
providerID uint32
providerResource net.Resource
srcProviderID uint32
srcProviderResource net.Resource
dstProviderID uint32
dstProviderResource net.Resource
)
if grpcProvider, ok := provider.(goplugin.GRPCClientConn); ok {
if srcGrpcProvider, ok := srcProvider.(goplugin.GRPCClientConn); ok {
// TODO: BCF-3061 ccip provider can create new services. the proxying needs to be augmented
// to intercept and route to the created services. also, need to prevent leaks.
providerID, providerResource, err = c.Serve("ExecProvider", proxy.NewProxy(grpcProvider.ClientConn()))
srcProviderID, srcProviderResource, err = c.Serve("ExecProvider", proxy.NewProxy(srcGrpcProvider.ClientConn()))
} else {
// loop client runs in the core node. if the provider is not a grpc client conn, then we are in legacy mode
// and need to serve all the required services locally.
providerID, providerResource, err = c.ServeNew("ExecProvider", func(s *grpc.Server) {
ccipprovider.RegisterExecutionProviderServices(s, provider, c.BrokerExt)
srcProviderID, srcProviderResource, err = c.ServeNew("ExecProvider", func(s *grpc.Server) {
ccipprovider.RegisterExecutionProviderServices(s, srcProvider, c.BrokerExt)
})
}
if err != nil {
return 0, nil, err
}
deps.Add(providerResource)
deps.Add(srcProviderResource)

if dstGrpcProvider, ok := dstProvider.(goplugin.GRPCClientConn); ok {
// TODO: BCF-3061 ccip provider can create new services. the proxying needs to be augmented
// to intercept and route to the created services. also, need to prevent leaks.
dstProviderID, dstProviderResource, err = c.Serve("ExecProvider", proxy.NewProxy(dstGrpcProvider.ClientConn()))
} else {
// loop client runs in the core node. if the provider is not a grpc client conn, then we are in legacy mode
// and need to serve all the required services locally.
dstProviderID, dstProviderResource, err = c.ServeNew("ExecProvider", func(s *grpc.Server) {
ccipprovider.RegisterExecutionProviderServices(s, dstProvider, c.BrokerExt)
})
}
if err != nil {
return 0, nil, err
}
deps.Add(dstProviderResource)

resp, err := c.generator.NewExecutionFactory(ctx, &ccippb.NewExecutionFactoryRequest{
ProviderServiceId: providerID,
SrcProviderServiceId: srcProviderID,
DstProviderServiceId: dstProviderID,
SrcChain: uint32(srcChainID),
DstChain: uint32(dstChainID),
SrcTokenAddress: srcTokenAddress,
})
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -115,15 +137,23 @@ func (r *ExecutionLOOPServer) NewExecutionFactory(ctx context.Context, request *
}
}()

// lookup the provider service
providerConn, err := r.Dial(request.ProviderServiceId)
// lookup the source provider service
srcProviderConn, err := r.Dial(request.SrcProviderServiceId)
if err != nil {
return nil, net.ErrConnDial{Name: "ExecProvider", ID: request.SrcProviderServiceId, Err: err}
}
deps.Add(net.Resource{Closer: srcProviderConn, Name: "ExecProvider"})
srcProvider := ccipprovider.NewExecProviderClient(r.BrokerExt, srcProviderConn)

// lookup the dest provider service
dstProviderConn, err := r.Dial(request.DstProviderServiceId)
if err != nil {
return nil, net.ErrConnDial{Name: "ExecProvider", ID: request.ProviderServiceId, Err: err}
return nil, net.ErrConnDial{Name: "ExecProvider", ID: request.DstProviderServiceId, Err: err}
}
deps.Add(net.Resource{Closer: providerConn, Name: "ExecProvider"})
provider := ccipprovider.NewExecProviderClient(r.BrokerExt, providerConn)
deps.Add(net.Resource{Closer: dstProviderConn, Name: "ExecProvider"})
dstProvider := ccipprovider.NewExecProviderClient(r.BrokerExt, dstProviderConn)

factory, err := r.impl.NewExecutionFactory(ctx, provider)
factory, err := r.impl.NewExecutionFactory(ctx, srcProvider, dstProvider, int64(request.SrcChain), int64(request.DstChain), request.SrcTokenAddress)
if err != nil {
return nil, fmt.Errorf("failed to create new execution factory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/provider_ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CCIPCommitFactoryGenerator interface {
}

type CCIPExecutionFactoryGenerator interface {
NewExecutionFactory(ctx context.Context, provider CCIPExecProvider) (ReportingPluginFactory, error)
NewExecutionFactory(ctx context.Context, srcProvider CCIPExecProvider, dstProvider CCIPExecProvider, srcChainID int64, dstChainID int64, sourceTokenAddress string) (ReportingPluginFactory, error)
}
type CCIPFactoryGenerator interface {
CCIPCommitFactoryGenerator
Expand Down

0 comments on commit eac77b0

Please sign in to comment.