Skip to content

Commit

Permalink
[BCF-2463] Add generic job type for lightweight OCR plugins (#10665)
Browse files Browse the repository at this point in the history
* [BCF-2463] Add generic job type for lightweight OCR plugins

* [chore] Refactor ErrJobSpecNoRelayer; add ErrRelayerNotEnabled

* [feedback] Move validation of PluginConfig to ocr2/validate/validate.go

* Add tests for pluginConfig

* Sensible defaults for command
  • Loading branch information
cedric-cordenier authored Nov 2, 2023
1 parent f7e868e commit a08b705
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 55 deletions.
169 changes: 157 additions & 12 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"time"

"google.golang.org/grpc"
"gopkg.in/guregu/null.v4"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,6 +31,7 @@ import (

relaylogger "github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/reportingplugins"
"github.com/smartcontractkit/chainlink-relay/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/dkg"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/dkg/persistence"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
Expand Down Expand Up @@ -70,7 +73,28 @@ import (
"github.com/smartcontractkit/chainlink/v2/plugins"
)

var ErrJobSpecNoRelayer = errors.New("OCR2 job spec could not get relayer id")
type ErrJobSpecNoRelayer struct {
PluginName string
Err error
}

func (e ErrJobSpecNoRelayer) Unwrap() error { return e.Err }

func (e ErrJobSpecNoRelayer) Error() string {
return fmt.Sprintf("%s services: OCR2 job spec could not get relayer ID: %s", e.PluginName, e.Err)
}

type ErrRelayNotEnabled struct {
PluginName string
Relay string
Err error
}

func (e ErrRelayNotEnabled) Unwrap() error { return e.Err }

func (e ErrRelayNotEnabled) Error() string {
return fmt.Sprintf("%s services: failed to get relay %s, is it enabled? %s", e.PluginName, e.Relay, e.Err)
}

type RelayGetter interface {
Get(id relay.ID) (loop.Relayer, error)
Expand Down Expand Up @@ -245,7 +269,7 @@ func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error {

rid, err := spec.RelayID()
if err != nil {
d.lggr.Errorw("DeleteJob: "+ErrJobSpecNoRelayer.Error(), "err", err)
d.lggr.Errorw("DeleteJob", "err", ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)})
return nil
}
// we only have clean to do for the EVM
Expand Down Expand Up @@ -337,7 +361,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("ServicesForSpec: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}

if rid.Network == relay.EVM {
Expand Down Expand Up @@ -428,6 +452,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
s4PluginDB := NewDB(d.db, spec.ID, s4PluginId, lggr, d.cfg.Database())
return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)

case types.GenericPlugin:
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

default:
return nil, errors.Errorf("plugin type %s not supported", spec.PluginType)
}
Expand Down Expand Up @@ -473,6 +500,124 @@ func GetEVMEffectiveTransmitterID(jb *job.Job, chain evm.Chain, lggr logger.Suga
return spec.TransmitterID.String, nil
}

type connProvider interface {
ClientConn() grpc.ClientConnInterface
}

func defaultPathFromPluginName(pluginName string) string {
// By default we install the command on the system path, in the
// form: `chainlink-<plugin name>`
return fmt.Sprintf("chainlink-%s", pluginName)
}

func (d *Delegate) newServicesGenericPlugin(
ctx context.Context,
lggr logger.SugaredLogger,
jb job.Job,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
) (srvs []job.ServiceCtx, err error) {
spec := jb.OCR2OracleSpec

p := validate.OCR2GenericPluginConfig{}
err = json.Unmarshal(spec.PluginConfig.Bytes(), &p)
if err != nil {
return nil, err
}
cconf := p.CoreConfig

command := cconf.Command
if command == "" {
command = defaultPathFromPluginName(cconf.PluginName)
}

// NOTE: we don't need to validate this config, since that happens as part of creating the job.
// See: validate/validate.go's `validateSpec`.

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{PluginName: cconf.PluginName, Err: err}
}

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.CoreConfig.PluginName}
}

provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{
ExternalJobID: jb.ExternalJobID,
JobID: spec.ID,
ContractID: spec.ContractID,
New: d.isNewlyCreatedJob,
RelayConfig: spec.RelayConfig.Bytes(),
ProviderType: cconf.ProviderType,
}, types.PluginArgs{
TransmitterID: spec.TransmitterID.String,
PluginConfig: spec.PluginConfig.Bytes(),
})
if err != nil {
return nil, err
}
srvs = append(srvs, provider)

oracleEndpoint := d.monitoringEndpointGen.GenMonitoringEndpoint(
spec.ContractID,
synchronization.TelemetryType(cconf.TelemetryType),
rid.Network,
rid.ChainID,
)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Database: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: oracleEndpoint,
OffchainKeyring: kb,
OnchainKeyring: kb,
ContractTransmitter: provider.ContractTransmitter(),
ContractConfigTracker: provider.ContractConfigTracker(),
OffchainConfigDigester: provider.OffchainConfigDigester(),
}

pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), command)
if err != nil {
return nil, fmt.Errorf("failed to register loop: %w", err)
}

errorLog := &errorLog{jobID: jb.ID, recordError: d.jobORM.RecordError}
providerConn, ok := provider.(connProvider)
if !ok {
return nil, errors.New("provider not supported: the provider is not a LOOPP provider")
}

pluginConfig := types.ReportingPluginServiceConfig{
PluginName: cconf.PluginName,
Command: command,
ProviderType: cconf.ProviderType,
PluginConfig: string(p.PluginConfig),
}

pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner)
ta := generic.NewTelemetryAdapter(d.monitoringEndpointGen)

plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerConn.ClientConn(), pr, ta, errorLog)
oracleArgs.ReportingPluginFactory = plugin
srvs = append(srvs, plugin)

oracle, err := libocr2.NewOracle(oracleArgs)
if err != nil {
return nil, err
}

srvs = append(srvs, job.NewServiceAdapter(oracle))
return srvs, nil
}

func (d *Delegate) newServicesMercury(
ctx context.Context,
lggr logger.SugaredLogger,
Expand All @@ -498,14 +643,14 @@ func (d *Delegate) newServicesMercury(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("mercury services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "mercury"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("mercury services: expected EVM relayer got %s", rid.Network)
}
relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, fmt.Errorf("failed to get relay %s is it enabled?: %w", spec.Relay, err)
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: "mercury"}
}
chain, err := d.legacyChains.Get(rid.ChainID)
if err != nil {
Expand Down Expand Up @@ -574,7 +719,7 @@ func (d *Delegate) newServicesMedian(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("median services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "median"}
}

oracleArgsNoPlugin := libocr2.OCR2OracleArgs{
Expand All @@ -593,7 +738,7 @@ func (d *Delegate) newServicesMedian(

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, fmt.Errorf("median services; failed to get relay %s is it enabled?: %w", spec.Relay, err)
return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay}
}

medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)
Expand All @@ -618,7 +763,7 @@ func (d *Delegate) newServicesDKG(
spec := jb.OCR2OracleSpec
rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("DKG services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "DKG"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("DKG services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -687,7 +832,7 @@ func (d *Delegate) newServicesOCR2VRF(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("VRF services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "VRF"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("VRF services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -912,7 +1057,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
mc := d.cfg.Mercury().Credentials(credName)
rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("keeper2 services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "keeper2"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("keeper2 services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -1026,7 +1171,7 @@ func (d *Delegate) newServicesOCR2Keepers20(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("keepers2.0 services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "keepers2.0"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("keepers2.0 services: expected EVM relayer got %s", rid.Network)
Expand Down Expand Up @@ -1161,7 +1306,7 @@ func (d *Delegate) newServicesOCR2Functions(

rid, err := spec.RelayID()
if err != nil {
return nil, fmt.Errorf("functions services: %w: %w", ErrJobSpecNoRelayer, err)
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: "functions"}
}
if rid.Network != relay.EVM {
return nil, fmt.Errorf("functions services: expected EVM relayer got %s", rid.Network)
Expand Down
7 changes: 7 additions & 0 deletions core/services/ocr2/plugins/generic/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package generic

import "github.com/smartcontractkit/libocr/commontypes"

func (t *TelemetryAdapter) Endpoints() map[[4]string]commontypes.MonitoringEndpoint {
return t.endpoints
}
32 changes: 32 additions & 0 deletions core/services/ocr2/plugins/generic/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package generic

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMerge(t *testing.T) {
vars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
},
}
addedVars := map[string]interface{}{
"jb": map[string]interface{}{
"some-other-var": "foo",
},
"val": 0,
}

merge(vars, addedVars)

assert.True(t, reflect.DeepEqual(vars, map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
"some-other-var": "foo",
},
"val": 0,
}), vars)
}
Loading

0 comments on commit a08b705

Please sign in to comment.