Skip to content

Commit

Permalink
Bcf 3040 ccip test scaffold (#401)
Browse files Browse the repository at this point in the history
* wip

* progress serving on/off ramp reader

* working on exec test

* exec service test working

* tests working for embedded relayer

* update TODOs with jira reference, linter

* clean up

* add tickets to all TODOS, fix comments

* fix merge
  • Loading branch information
krehermann authored Mar 14, 2024
1 parent 049609b commit 03b543a
Show file tree
Hide file tree
Showing 45 changed files with 2,262 additions and 230 deletions.
93 changes: 93 additions & 0 deletions pkg/loop/ccip_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package loop

import (
"context"
"fmt"
"os/exec"

"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

// CCIPExecutionLOOPName is the name for [types.CCIPExecutionFactoryGenerator]/[NewExecutionLOOP].
const CCIPExecutionLOOPName = "ccip_execution"

func PluginCCIPExecutionHandshakeConfig() plugin.HandshakeConfig {
return plugin.HandshakeConfig{
MagicCookieKey: "CL_PLUGIN_CCIP_EXEC_MAGIC_COOKIE",
MagicCookieValue: "5a2d1527-6c0f-4c7e-8c96-00aa4bececd2",
}
}

type ExecutionLoop struct {
plugin.NetRPCUnsupportedPlugin

BrokerConfig

PluginServer types.CCIPExecutionFactoryGenerator

pluginClient *internal.ExecutionLOOPClient
}

func (p *ExecutionLoop) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error {
return internal.RegisterExecutionLOOPServer(server, broker, p.BrokerConfig, p.PluginServer)
}

// GRPCClient implements [plugin.GRPCPlugin] and returns the pluginClient [types.CCIPExecutionFactoryGenerator], updated with the new broker and conn.
func (p *ExecutionLoop) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error) {
if p.pluginClient == nil {
p.pluginClient = internal.NewExecutionLOOPClient(broker, p.BrokerConfig, conn)
} else {
p.pluginClient.Refresh(broker, conn)
}

return types.CCIPExecutionFactoryGenerator(p.pluginClient), nil
}

func (p *ExecutionLoop) ClientConfig() *plugin.ClientConfig {
return &plugin.ClientConfig{
HandshakeConfig: PluginCCIPExecutionHandshakeConfig(),
Plugins: map[string]plugin.Plugin{CCIPExecutionLOOPName: p},
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
GRPCDialOptions: p.DialOpts,
Logger: HCLogLogger(p.Logger),
}
}

var _ ocrtypes.ReportingPluginFactory = (*ExecutionFactoryService)(nil)

// ExecutionFactoryService is a [types.Service] that maintains an internal [types.CCIPExecutionFactoryGenerator].
type ExecutionFactoryService struct {
internal.PluginService[*ExecutionLoop, types.ReportingPluginFactory]
}

// NewExecutionService returns a new [*ExecutionFactoryService].
// cmd must return a new exec.Cmd each time it is called.
func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.CCIPExecProvider, config types.CCIPExecFactoryGeneratorConfig) *ExecutionFactoryService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
plug, ok := instance.(types.CCIPExecutionFactoryGenerator)
if !ok {
return nil, fmt.Errorf("expected PluginMedian but got %T", instance)
}
return plug.NewExecutionFactory(ctx, provider)
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "CCIPExecutionService")
var efs ExecutionFactoryService
broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts}
efs.Init(CCIPExecutionLOOPName, &ExecutionLoop{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
return &efs
}

func (m *ExecutionFactoryService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
if err := m.Wait(); err != nil {
return nil, ocrtypes.ReportingPluginInfo{}, err
}
return m.Service.NewReportingPlugin(config)
}
124 changes: 124 additions & 0 deletions pkg/loop/ccip_execution_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package loop_test

import (
"context"
"os/exec"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/go-plugin"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal"
ccip_test "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/ccip/test"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test"
testcore "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test/core"
testreportingplugin "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test/ocr2/reporting_plugin"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

func TestExecService(t *testing.T) {
t.Parallel()

exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return NewHelperProcessCommand(loop.CCIPExecutionLOOPName, false, 0)
}, ccip_test.ExecutionProvider, ccip_test.ExecutionConfig)
hook := exec.PluginService.XXXTestHook()
servicetest.Run(t, exec)

t.Run("control", func(t *testing.T) {
testreportingplugin.RunFactory(t, exec)
})

t.Run("Kill", func(t *testing.T) {
hook.Kill()

// wait for relaunch
time.Sleep(2 * internal.KeepAliveTickDuration)

testreportingplugin.RunFactory(t, exec)
})

t.Run("Reset", func(t *testing.T) {
hook.Reset()

// wait for relaunch
time.Sleep(2 * internal.KeepAliveTickDuration)

testreportingplugin.RunFactory(t, exec)
})
}

func TestExecService_recovery(t *testing.T) {
t.Parallel()
var limit atomic.Int32
exec := loop.NewExecutionService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
h := HelperProcessCommand{
Command: loop.CCIPExecutionLOOPName,
Limit: int(limit.Add(1)),
}
return h.New()
}, ccip_test.ExecutionProvider, ccip_test.ExecutionConfig)
servicetest.Run(t, exec)

testreportingplugin.RunFactory(t, exec)
}

func TestExecLOOP(t *testing.T) {
// launch the exec loop via the main program
t.Parallel()
stopCh := newStopCh(t)
exec := loop.ExecutionLoop{BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}
cc := exec.ClientConfig()
cc.Cmd = NewHelperProcessCommand(loop.CCIPExecutionLOOPName, false, 0)
c := plugin.NewClient(cc)
// make to kill the exec loop
t.Cleanup(c.Kill)
client, err := c.Client()
require.NoError(t, err)
defer client.Close()
require.NoError(t, client.Ping())
// get a concrete instance of the exec loop
instance, err := client.Dispense(loop.CCIPExecutionLOOPName)
remoteExecFactory := instance.(types.CCIPExecutionFactoryGenerator)
require.NoError(t, err)

ccip_test.RunExecutionLOOP(t, remoteExecFactory)

t.Run("proxy: exec loop <--> relayer loop", func(t *testing.T) {
// launch the relayer as external process via the main program
pr := newPluginRelayerExec(t, false, stopCh)
remoteProvider, err := newExecutionProvider(t, pr)
require.Error(t, err, "expected error")
assert.Contains(t, err.Error(), "BCF-3061")
if err == nil {
// test to run when BCF-3061 is fixed
ccip_test.ExecutionLOOPTester{CCIPExecProvider: remoteProvider}.Run(t, remoteExecFactory)
}
})
}

func newExecutionProvider(t *testing.T, pr loop.PluginRelayer) (types.CCIPExecProvider, error) {
ctx := context.Background()
r, err := pr.NewRelayer(ctx, test.ConfigTOML, testcore.Keystore)
require.NoError(t, err)
servicetest.Run(t, r)

// TODO: fix BCF-3061. we expect an error here until then.
p, err := r.NewPluginProvider(ctx, ccip_test.ExecutionRelayArgs, ccip_test.ExecutionPluginArgs)
if err != nil {
return nil, err
}
// TODO: this shouldn't run until BCF-3061 is fixed
require.NoError(t, err)
execProvider, ok := p.(types.CCIPExecProvider)
require.True(t, ok, "got %T", p)
servicetest.Run(t, execProvider)
return execProvider, nil
}
2 changes: 1 addition & 1 deletion pkg/loop/internal/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Broker interface {

var _ Broker = (*atomicBroker)(nil)

// An atomicBroker implements [Broker] and is backed by a swappable [*plugin.GRPCBroker]
// An atomicBroker implements [Broker] and is backed by a swappable [*plugin.GRPCBroker].
type atomicBroker struct {
broker atomic.Pointer[Broker]
}
Expand Down
1 change: 0 additions & 1 deletion pkg/loop/internal/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"

"google.golang.org/grpc"

"google.golang.org/protobuf/types/known/emptypb"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
Expand Down
22 changes: 8 additions & 14 deletions pkg/loop/internal/capabilities_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (cr *capabilitiesRegistryClient) Add(ctx context.Context, c capabilities.Ba
return err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(c, info.CapabilityType)
if err != nil {
return err
Expand All @@ -143,7 +143,6 @@ func (cr *capabilitiesRegistryClient) Add(ctx context.Context, c capabilities.Ba
id, cRes, err := cr.ServeNew(info.ID, func(s *grpc.Server) {
pbRegisterCapability(s, cr.BrokerExt, c, info.CapabilityType)
})

if err != nil {
return err
}
Expand Down Expand Up @@ -182,7 +181,7 @@ func (c *capabilitiesRegistryServer) Get(ctx context.Context, request *pb.GetReq
return nil, err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(capability, info.CapabilityType)
if err != nil {
return nil, err
Expand All @@ -191,7 +190,6 @@ func (c *capabilitiesRegistryServer) Get(ctx context.Context, request *pb.GetReq
id, _, err := c.ServeNew("Get", func(s *grpc.Server) {
pbRegisterCapability(s, c.BrokerExt, capability, info.CapabilityType)
})

if err != nil {
return nil, err
}
Expand All @@ -208,7 +206,7 @@ func (c *capabilitiesRegistryServer) GetTrigger(ctx context.Context, request *pb
return nil, err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(capability, capabilities.CapabilityTypeTrigger)
if err != nil {
return nil, err
Expand All @@ -217,7 +215,6 @@ func (c *capabilitiesRegistryServer) GetTrigger(ctx context.Context, request *pb
id, _, err := c.ServeNew("GetTrigger", func(s *grpc.Server) {
pbRegisterCapability(s, c.BrokerExt, capability, capabilities.CapabilityTypeTrigger)
})

if err != nil {
return nil, err
}
Expand All @@ -233,7 +230,7 @@ func (c *capabilitiesRegistryServer) GetAction(ctx context.Context, request *pb.
return nil, err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(capability, capabilities.CapabilityTypeAction)
if err != nil {
return nil, err
Expand All @@ -242,7 +239,6 @@ func (c *capabilitiesRegistryServer) GetAction(ctx context.Context, request *pb.
id, _, err := c.ServeNew("GetAction", func(s *grpc.Server) {
pbRegisterCapability(s, c.BrokerExt, capability, capabilities.CapabilityTypeAction)
})

if err != nil {
return nil, err
}
Expand All @@ -258,7 +254,7 @@ func (c *capabilitiesRegistryServer) GetConsensus(ctx context.Context, request *
return nil, err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(capability, capabilities.CapabilityTypeConsensus)
if err != nil {
return nil, err
Expand All @@ -267,7 +263,6 @@ func (c *capabilitiesRegistryServer) GetConsensus(ctx context.Context, request *
id, _, err := c.ServeNew("GetConsensus", func(s *grpc.Server) {
pbRegisterCapability(s, c.BrokerExt, capability, capabilities.CapabilityTypeConsensus)
})

if err != nil {
return nil, err
}
Expand All @@ -283,7 +278,7 @@ func (c *capabilitiesRegistryServer) GetTarget(ctx context.Context, request *pb.
return nil, err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(capability, capabilities.CapabilityTypeTarget)
if err != nil {
return nil, err
Expand All @@ -292,7 +287,6 @@ func (c *capabilitiesRegistryServer) GetTarget(ctx context.Context, request *pb.
id, _, err := c.ServeNew("GetTarget", func(s *grpc.Server) {
pbRegisterCapability(s, c.BrokerExt, capability, capabilities.CapabilityTypeTarget)
})

if err != nil {
return nil, err
}
Expand All @@ -318,7 +312,7 @@ func (c *capabilitiesRegistryServer) List(ctx context.Context, _ *emptypb.Empty)
return nil, err
}

//Check the capability and the CapabilityType match here as the ServeNew method does not return an error
// Check the capability and the CapabilityType match here as the ServeNew method does not return an error
err = validateCapability(cap, info.CapabilityType)
if err != nil {
c.CloseAll(resources...)
Expand Down Expand Up @@ -396,7 +390,7 @@ func validateCapability(impl capabilities.BaseCapability, t capabilities.Capabil
}

// pbRegisterCapability registers the server with the correct capability based on capability type, this method assumes
// that the capability has already been validated with validateCapability
// that the capability has already been validated with validateCapability.
func pbRegisterCapability(s *grpc.Server, b *BrokerExt, impl capabilities.BaseCapability, t capabilities.CapabilityType) {
switch t {
case capabilities.CapabilityTypeTrigger:
Expand Down
Loading

0 comments on commit 03b543a

Please sign in to comment.