Skip to content

Commit

Permalink
isolate fix to mercury; more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Dec 10, 2024
1 parent b2f7237 commit 203ddff
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 18 deletions.
57 changes: 44 additions & 13 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mercury

import (
"context"
"encoding/json"
"fmt"
"os/exec"
Expand Down Expand Up @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loop mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand All @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) {
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
}
loopID := mercuryLggr.Name()
cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
ID: loopID,
Cmd: cmd,
Env: envVars,
})
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
}
return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil
}

// loopUnregisterCloser is a helper to unregister a loop
// as a service
// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern
// perhaps it can be implemented in the delegate on job delete.
type loopUnregisterCloser struct {
r plugins.RegistrarConfig
id string
}

func (l *loopUnregisterCloser) Close() error {
l.r.UnregisterLOOP(l.id)
return nil
}

func (l *loopUnregisterCloser) Start(ctx context.Context) error {
return nil
}

func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser {
return &loopUnregisterCloser{
r: r,
id: id,
}
return cmdFn, opts, mercuryLggr, nil
}
92 changes: 88 additions & 4 deletions core/services/ocr2/plugins/mercury/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -22,6 +23,7 @@ import (
v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"
v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
v4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury"

Expand Down Expand Up @@ -92,15 +94,15 @@ var (

// this is kind of gross, but it's the best way to test return values of the services
expectedEmbeddedServiceCnt = 3
expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 1
expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 2 // factory server and loop unregisterer
)

func TestNewServices(t *testing.T) {
type args struct {
pluginConfig job.JSONConfig
feedID utils.FeedID
}
tests := []struct {
testCases := []struct {
name string
args args
loopMode bool
Expand Down Expand Up @@ -198,7 +200,7 @@ func TestNewServices(t *testing.T) {
wantLoopFactory: &loop.MercuryV4Service{},
},
}
for _, tt := range tests {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
if tt.loopMode {
t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd")
Expand All @@ -222,6 +224,88 @@ func TestNewServices(t *testing.T) {
}
})
}

t.Run("restartable loop", func(t *testing.T) {
// setup a real loop registry to test restartability
registry := plugins.NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "")
loopRegistrarConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, registry.Register, registry.Unregister)
prodCfg := mercuryocr2.NewMercuryConfig(1, 1, loopRegistrarConfig)
type args struct {
pluginConfig job.JSONConfig
feedID utils.FeedID
cfg mercuryocr2.Config
}
testCases := []struct {
name string
args args
wantErr bool
}{
{
name: "v1 loop",
args: args{
pluginConfig: v1jsonCfg,
feedID: v1FeedId,
cfg: prodCfg,
},
wantErr: false,
},
{
name: "v2 loop",
args: args{
pluginConfig: v2jsonCfg,
feedID: v2FeedId,
cfg: prodCfg,
},
wantErr: false,
},
{
name: "v3 loop",
args: args{
pluginConfig: v3jsonCfg,
feedID: v3FeedId,
cfg: prodCfg,
},
wantErr: false,
},
{
name: "v4 loop",
args: args{
pluginConfig: v4jsonCfg,
feedID: v4FeedId,
cfg: prodCfg,
},
wantErr: false,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd")
assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get())

got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg)

Check failure on line 286 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

too many arguments in call to newServicesTestWrapper

Check failure on line 286 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / lint

too many arguments in call to newServicesTestWrapper

Check failure on line 286 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

too many arguments in call to newServicesTestWrapper

Check failure on line 286 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Find New Flaky Tests In Chainlink Project / Run Tests (github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury, ubuntu-la...

too many arguments in call to newServicesTestWrapper

Check failure on line 286 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

too many arguments in call to newServicesTestWrapper
if (err != nil) != tt.wantErr {
t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr)
return
}
// hack to simulate a restart. we don't have enough boilerplate to start the oracle service
// only care about the subservices so we start all except the oracle, which happens to be the last one
for i := 0; i < len(got)-1; i++ {
require.NoError(t, got[i].Start(tests.Context(t)))
}
// if we don't close the services, we get conflicts with the loop registry
_, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg)

Check failure on line 297 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

too many arguments in call to newServicesTestWrapper

Check failure on line 297 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / lint

too many arguments in call to newServicesTestWrapper

Check failure on line 297 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

too many arguments in call to newServicesTestWrapper

Check failure on line 297 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Find New Flaky Tests In Chainlink Project / Run Tests (github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury, ubuntu-la...

too many arguments in call to newServicesTestWrapper

Check failure on line 297 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

too many arguments in call to newServicesTestWrapper
require.ErrorContains(t, err, "plugin already registered")

// close all services and try again
for i := len(got) - 2; i >= 0; i-- {
require.NoError(t, got[i].Close())
}
_, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg)

Check failure on line 304 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

too many arguments in call to newServicesTestWrapper

Check failure on line 304 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / lint

too many arguments in call to newServicesTestWrapper

Check failure on line 304 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

too many arguments in call to newServicesTestWrapper

Check failure on line 304 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Find New Flaky Tests In Chainlink Project / Run Tests (github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury, ubuntu-la...

too many arguments in call to newServicesTestWrapper

Check failure on line 304 in core/services/ocr2/plugins/mercury/plugin_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

too many arguments in call to newServicesTestWrapper
require.NoError(t, err)
})
}
})
}

// we are only varying the version via feedID (and the plugin config)
Expand Down Expand Up @@ -292,7 +376,7 @@ func (*testProvider) ReportCodecV3() v3.ReportCodec { return nil }
func (*testProvider) ReportCodecV4() v4.ReportCodec { return nil }

// Start implements types.MercuryProvider.
func (*testProvider) Start(context.Context) error { panic("unimplemented") }
func (*testProvider) Start(context.Context) error { return nil }

var _ commontypes.MercuryProvider = (*testProvider)(nil)

Expand Down
2 changes: 1 addition & 1 deletion plugins/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/loop"
)

// RegistrarConfig generates contains static configuration inher
// RegistrarConfig generates contains static configuration
type RegistrarConfig interface {
RegisterLOOP(config CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error)
UnregisterLOOP(ID string)
Expand Down

0 comments on commit 203ddff

Please sign in to comment.