From 841878492df30484a16a480adf8900e640a83be7 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Tue, 10 Dec 2024 05:23:36 -0800 Subject: [PATCH 1/7] Increase default timeout of remote Executable requests (#15587) --- core/capabilities/launcher.go | 3 ++- core/capabilities/remote/executable/client.go | 10 ++++++++-- core/capabilities/remote/executable/endtoend_test.go | 2 +- core/capabilities/remote/executable/server.go | 8 ++++++-- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index e75f2ebbc8f..a8cad163cee 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -387,7 +387,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability } var ( - defaultTargetRequestTimeout = time.Minute + // TODO: make this configurable + defaultTargetRequestTimeout = 8 * time.Minute ) func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error { diff --git a/core/capabilities/remote/executable/client.go b/core/capabilities/remote/executable/client.go index 9af32eb5f8e..776ddb692ad 100644 --- a/core/capabilities/remote/executable/client.go +++ b/core/capabilities/remote/executable/client.go @@ -41,6 +41,8 @@ var _ commoncap.ExecutableCapability = &client{} var _ types.Receiver = &client{} var _ services.Service = &client{} +const expiryCheckInterval = 30 * time.Second + func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *client { return &client{ @@ -98,7 +100,11 @@ func (c *client) checkDispatcherReady() { } func (c *client) checkForExpiredRequests() { - ticker := time.NewTicker(c.requestTimeout) + tickerInterval := expiryCheckInterval + if c.requestTimeout < tickerInterval { + tickerInterval = c.requestTimeout + } + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() for { select { @@ -116,7 +122,7 @@ func (c *client) expireRequests() { for messageID, req := range c.requestIDToCallerRequest { if req.Expired() { - req.Cancel(errors.New("request expired")) + req.Cancel(errors.New("request expired by executable client")) delete(c.requestIDToCallerRequest, messageID) } diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 376b4d5852f..5e0a439d4ab 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -156,7 +156,7 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) { methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { - assert.Equal(t, "error executing request: request expired", responseError.Error()) + assert.Equal(t, "error executing request: request expired by executable client", responseError.Error()) }) }) diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index b767a2d7030..d43c7ab5c41 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -87,7 +87,11 @@ func (r *server) Start(ctx context.Context) error { r.wg.Add(1) go func() { defer r.wg.Done() - ticker := time.NewTicker(r.requestTimeout) + tickerInterval := expiryCheckInterval + if r.requestTimeout < tickerInterval { + tickerInterval = r.requestTimeout + } + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() r.lggr.Info("executable capability server started") for { @@ -118,7 +122,7 @@ func (r *server) expireRequests() { for requestID, executeReq := range r.requestIDToRequest { if executeReq.request.Expired() { - err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired") + err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired by executable server") if err != nil { r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err) } From 02e67cb045d40c00cd0c32faf4bb60183f6ce9b1 Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:00:21 -0700 Subject: [PATCH 2/7] Ks 616/aptos hot fix (#15619) * isolate fix to mercury; more tests --- .changeset/big-camels-report.md | 5 + core/services/ocr2/plugins/mercury/plugin.go | 89 ++++++++---- .../ocr2/plugins/mercury/plugin_test.go | 133 ++++++++++++++++-- plugins/registrar.go | 2 +- 4 files changed, 190 insertions(+), 39 deletions(-) create mode 100644 .changeset/big-camels-report.md diff --git a/.changeset/big-camels-report.md b/.changeset/big-camels-report.md new file mode 100644 index 00000000000..c6b0ed94df2 --- /dev/null +++ b/.changeset/big-camels-report.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix fix missing unregister in mercury job loop diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index 8a4101804dd..b0983e55c89 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -1,6 +1,7 @@ package mercury import ( + "context" "encoding/json" "fmt" "os/exec" @@ -79,14 +80,13 @@ func NewServices( return nil, errors.New("expected job to have a non-nil PipelineSpec") } - var err error var pluginConfig config.PluginConfig if len(jb.OCR2OracleSpec.PluginConfig) == 0 { if !enableTriggerCapability { return nil, fmt.Errorf("at least one transmission option must be configured") } } else { - err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig) + err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig) if err != nil { return nil, errors.WithStack(err) } @@ -101,8 +101,8 @@ func NewServices( // encapsulate all the subservices and ensure we close them all if any fail to start srvs := []job.ServiceCtx{ocr2Provider} abort := func() { - if err = services.MultiCloser(srvs).Close(); err != nil { - lggr.Errorw("Error closing unused services", "err", err) + if cerr := services.MultiCloser(srvs).Close(); cerr != nil { + lggr.Errorw("Error closing unused services", "err", cerr) } } saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) @@ -112,6 +112,7 @@ func NewServices( var ( factory ocr3types.MercuryPluginFactory factoryServices []job.ServiceCtx + fErr error ) fCfg := factoryCfg{ orm: orm, @@ -127,31 +128,31 @@ func NewServices( } switch feedID.Version() { case 1: - factory, factoryServices, err = newv1factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv1factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 2: - factory, factoryServices, err = newv2factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv2factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 3: - factory, factoryServices, err = newv3factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv3factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 4: - factory, factoryServices, err = newv4factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv4factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr) } srvs = append(srvs, factoryServices...) default: @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loop mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. return factory, srvs, nil } -func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) { +func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) { lggr.Debugw("Initializing Mercury loop", "command", cmd) mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String()) envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) if err != nil { - return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) + return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) } + loopID := mercuryLggr.Name() cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{ - ID: mercuryLggr.Name(), + ID: loopID, Cmd: cmd, Env: envVars, }) if err != nil { - return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + } + return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil +} + +// loopUnregisterCloser is a helper to unregister a loop +// as a service +// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern +// perhaps it can be implemented in the delegate on job delete. +type loopUnregisterCloser struct { + r plugins.RegistrarConfig + id string +} + +func (l *loopUnregisterCloser) Close() error { + l.r.UnregisterLOOP(l.id) + return nil +} + +func (l *loopUnregisterCloser) Start(ctx context.Context) error { + return nil +} + +func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser { + return &loopUnregisterCloser{ + r: r, + id: id, } - return cmdFn, opts, mercuryLggr, nil } diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index 22aaf7522de..eb67da53100 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -2,6 +2,7 @@ package mercury_test import ( "context" + "errors" "os/exec" "reflect" "testing" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -22,6 +24,7 @@ import ( v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" v4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury" @@ -92,21 +95,23 @@ var ( // this is kind of gross, but it's the best way to test return values of the services expectedEmbeddedServiceCnt = 3 - expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 1 + expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 2 // factory server and loop unregisterer ) func TestNewServices(t *testing.T) { type args struct { pluginConfig job.JSONConfig feedID utils.FeedID + cfg mercuryocr2.Config } - tests := []struct { + testCases := []struct { name string args args loopMode bool wantLoopFactory any wantServiceCnt int wantErr bool + wantErrStr string }{ { name: "no plugin config error ", @@ -186,6 +191,19 @@ func TestNewServices(t *testing.T) { wantErr: false, wantLoopFactory: &loop.MercuryV3Service{}, }, + { + name: "v3 loop err", + loopMode: true, + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + cfg: mercuryocr2.NewMercuryConfig(1, 1, &testRegistrarConfig{failRegister: true}), + }, + wantServiceCnt: expectedLoopServiceCnt, + wantErr: true, + wantLoopFactory: &loop.MercuryV3Service{}, + wantErrStr: "failed to init loop for feed", + }, { name: "v4 loop", loopMode: true, @@ -198,17 +216,27 @@ func TestNewServices(t *testing.T) { wantLoopFactory: &loop.MercuryV4Service{}, }, } - for _, tt := range tests { + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { if tt.loopMode { t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) } - got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID) + // use default config if not provided + if tt.args.cfg == nil { + tt.args.cfg = testCfg + } + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) if (err != nil) != tt.wantErr { t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) return } + if err != nil { + if tt.wantErrStr != "" { + assert.Contains(t, err.Error(), tt.wantErrStr) + } + return + } assert.Len(t, got, tt.wantServiceCnt) if tt.loopMode { foundLoopFactory := false @@ -222,15 +250,97 @@ func TestNewServices(t *testing.T) { } }) } + + t.Run("restartable loop", func(t *testing.T) { + // setup a real loop registry to test restartability + registry := plugins.NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "") + loopRegistrarConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, registry.Register, registry.Unregister) + prodCfg := mercuryocr2.NewMercuryConfig(1, 1, loopRegistrarConfig) + type args struct { + pluginConfig job.JSONConfig + feedID utils.FeedID + cfg mercuryocr2.Config + } + testCases := []struct { + name string + args args + wantErr bool + }{ + { + name: "v1 loop", + args: args{ + pluginConfig: v1jsonCfg, + feedID: v1FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v2 loop", + args: args{ + pluginConfig: v2jsonCfg, + feedID: v2FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v3 loop", + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v4 loop", + args: args{ + pluginConfig: v4jsonCfg, + feedID: v4FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") + assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) + + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) + return + } + // hack to simulate a restart. we don't have enough boilerplate to start the oracle service + // only care about the subservices so we start all except the oracle, which happens to be the last one + for i := 0; i < len(got)-1; i++ { + require.NoError(t, got[i].Start(tests.Context(t))) + } + // if we don't close the services, we get conflicts with the loop registry + _, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + require.ErrorContains(t, err, "plugin already registered") + + // close all services and try again + for i := len(got) - 2; i >= 0; i-- { + require.NoError(t, got[i].Close()) + } + _, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + require.NoError(t, err) + }) + } + }) } // we are only varying the version via feedID (and the plugin config) // this wrapper supplies dummy values for the rest of the arguments -func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID) ([]job.ServiceCtx, error) { +func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID, cfg mercuryocr2.Config) ([]job.ServiceCtx, error) { t.Helper() jb := testJob jb.OCR2OracleSpec.PluginConfig = pluginConfig - return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID, false) + return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, cfg, nil, &testDataSourceORM{}, feedID, false) } type testProvider struct{} @@ -292,16 +402,21 @@ func (*testProvider) ReportCodecV3() v3.ReportCodec { return nil } func (*testProvider) ReportCodecV4() v4.ReportCodec { return nil } // Start implements types.MercuryProvider. -func (*testProvider) Start(context.Context) error { panic("unimplemented") } +func (*testProvider) Start(context.Context) error { return nil } var _ commontypes.MercuryProvider = (*testProvider)(nil) -type testRegistrarConfig struct{} +type testRegistrarConfig struct { + failRegister bool +} func (c *testRegistrarConfig) UnregisterLOOP(ID string) {} // RegisterLOOP implements plugins.RegistrarConfig. -func (*testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { +func (c *testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { + if c.failRegister { + return nil, loop.GRPCOpts{}, errors.New("failed to register") + } return nil, loop.GRPCOpts{}, nil } diff --git a/plugins/registrar.go b/plugins/registrar.go index 2a82f2a6204..8523d3980cc 100644 --- a/plugins/registrar.go +++ b/plugins/registrar.go @@ -6,7 +6,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" ) -// RegistrarConfig generates contains static configuration inher +// RegistrarConfig generates contains static configuration type RegistrarConfig interface { RegisterLOOP(config CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) UnregisterLOOP(ID string) From 7cf5b2fe4ea42277cd05bbc3e63159d53a8f77f7 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:54:24 -0800 Subject: [PATCH 3/7] [Keystone] Standard error message for remote execution errors (#15615) --- .../capabilities/remote/executable/endtoend_test.go | 6 +++--- .../remote/executable/request/server_request.go | 13 +++++++++---- .../executable/request/server_request_test.go | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 5e0a439d4ab..9b67eb51a55 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -132,7 +132,7 @@ func Test_RemoteExecutionCapability_CapabilityError(t *testing.T) { methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { - assert.Equal(t, "error executing request: failed to execute capability: an error", responseError.Error()) + assert.Equal(t, "error executing request: failed to execute capability", responseError.Error()) }) }) @@ -156,12 +156,12 @@ func Test_RemoteExecutableCapability_RandomCapabilityError(t *testing.T) { methods = append(methods, func(ctx context.Context, caller commoncap.ExecutableCapability) { executeCapability(ctx, t, caller, transmissionSchedule, func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error) { - assert.Equal(t, "error executing request: request expired by executable client", responseError.Error()) + assert.Equal(t, "error executing request: failed to execute capability", responseError.Error()) }) }) for _, method := range methods { - testRemoteExecutableCapability(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute, + testRemoteExecutableCapability(ctx, t, capability, 10, 9, 1*time.Second, 10, 9, 10*time.Minute, method) } } diff --git a/core/capabilities/remote/executable/request/server_request.go b/core/capabilities/remote/executable/request/server_request.go index a4662e93987..629622494a4 100644 --- a/core/capabilities/remote/executable/request/server_request.go +++ b/core/capabilities/remote/executable/request/server_request.go @@ -2,6 +2,7 @@ package request import ( "context" + "errors" "fmt" "sync" "time" @@ -48,6 +49,8 @@ type ServerRequest struct { lggr logger.Logger } +var errExternalErrorMsg = errors.New("failed to execute capability") + func NewServerRequest(capability capabilities.ExecutableCapability, method string, capabilityID string, capabilityDonID uint32, capabilityPeerID p2ptypes.PeerID, callingDon commoncap.DON, requestID string, @@ -228,20 +231,22 @@ func executeCapabilityRequest(ctx context.Context, lggr logger.Logger, capabilit payload []byte) ([]byte, error) { capabilityRequest, err := pb.UnmarshalCapabilityRequest(payload) if err != nil { - return nil, fmt.Errorf("failed to unmarshal capability request: %w", err) + lggr.Errorw("failed to unmarshal capability request", "err", err) + return nil, errExternalErrorMsg } lggr.Debugw("executing capability", "metadata", capabilityRequest.Metadata) capResponse, err := capability.Execute(ctx, capabilityRequest) if err != nil { - lggr.Debugw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err) - return nil, fmt.Errorf("failed to execute capability: %w", err) + lggr.Errorw("received execution error", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", err) + return nil, errExternalErrorMsg } responsePayload, err := pb.MarshalCapabilityResponse(capResponse) if err != nil { - return nil, fmt.Errorf("failed to marshal capability response: %w", err) + lggr.Errorw("failed to marshal capability request", "err", err) + return nil, errExternalErrorMsg } lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID) diff --git a/core/capabilities/remote/executable/request/server_request_test.go b/core/capabilities/remote/executable/request/server_request_test.go index cbeec833a1f..ce539154d93 100644 --- a/core/capabilities/remote/executable/request/server_request_test.go +++ b/core/capabilities/remote/executable/request/server_request_test.go @@ -136,9 +136,9 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { require.NoError(t, err) assert.Len(t, dispatcher.msgs, 2) assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[0].Error) - assert.Equal(t, "failed to execute capability: an error", dispatcher.msgs[0].ErrorMsg) + assert.Equal(t, "failed to execute capability", dispatcher.msgs[0].ErrorMsg) assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[1].Error) - assert.Equal(t, "failed to execute capability: an error", dispatcher.msgs[1].ErrorMsg) + assert.Equal(t, "failed to execute capability", dispatcher.msgs[1].ErrorMsg) }) t.Run("Execute capability", func(t *testing.T) { From 8df27e05b4a2990b99a2c9b5a531a2a9e088aaf4 Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Tue, 10 Dec 2024 18:07:58 -0800 Subject: [PATCH 4/7] Bump common to release/2.19.0-aptos branch --- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 ++-- deployment/go.mod | 2 +- deployment/go.sum | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 ++-- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 ++-- 10 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 6a9d505f233..897d5d917de 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -24,7 +24,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v0.8.1 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000 github.com/smartcontractkit/chainlink/v2 v2.14.0-mercury-20240807.0.20241106193309-5560cd76211a github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 313ab7cd865..c56f9a98224 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1409,8 +1409,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/deployment/go.mod b/deployment/go.mod index 9bfee2dcd60..058dda00946 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -23,7 +23,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240926212305-a6deabdfce86 github.com/smartcontractkit/chain-selectors v1.0.29 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/deployment/go.sum b/deployment/go.sum index e313ac86693..6a74c588f8f 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1384,8 +1384,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/go.mod b/go.mod index 1b11bac5280..e80ee7a7ba2 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.29 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e github.com/smartcontractkit/chainlink-feeds v0.1.1 diff --git a/go.sum b/go.sum index a9dfae984b3..f5040b0a9f0 100644 --- a/go.sum +++ b/go.sum @@ -1078,8 +1078,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 3c31f638226..9071e0ae4df 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -37,7 +37,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.29 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.2 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.14 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index a8806992121..33039b778df 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1405,8 +1405,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 11dea3b3580..a06702405ba 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -17,7 +17,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.33.0 github.com/slack-go/slack v0.15.0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 + github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.14 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.2 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 13fd67ed34a..c9354af1599 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1394,8 +1394,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec h1:5vS1k8Qn09p8SQ3JzvS8iy4Pve7s3aVq+UPIdl74smY= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241118091009-43c2b4804cec/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068 h1:2llRW4Tn9W/EZp2XvXclQ9IjeTBwwxVPrrqaerX+vCE= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241114134822-aadff98ef068/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f h1:RZ90dXrz+nQsM5+7Rz/+ZvUs9WgZj1ZqGuRxtsMwgV8= +github.com/smartcontractkit/chainlink-common v0.3.1-0.20241210195010-36d99fa35f9f/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg= From 31cb374fd40e22d2570fcb51b3cd3d98b84da544 Mon Sep 17 00:00:00 2001 From: Patrick Date: Fri, 6 Dec 2024 09:36:48 -0500 Subject: [PATCH 5/7] wiring engine histogram buckets through to beholder Client (#15508) * wiring engine histogram buckets through to beholder Client * bumping common * Move metric views (#15515) * bumping common --------- Co-authored-by: Pavel <177363085+pkcll@users.noreply.github.com> --- core/cmd/shell.go | 5 +++++ core/services/workflows/monitoring.go | 32 +++++++++++++++++++++++++++ go.mod | 2 +- 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 966fa1a0ff8..829ed9d55ce 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -51,6 +51,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/store/migrate" @@ -109,6 +110,10 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme AuthPublicKeyHex: csaPubKeyHex, AuthHeaders: beholderAuthHeaders, } + // note: due to the OTEL specification, all histogram buckets + // must be defined when the beholder client is created + clientCfg.MetricViews = append(clientCfg.MetricViews, workflows.MetricViews()...) + if tracingCfg.Enabled { clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter() if err != nil { diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index d498ff354c9..f4e993d8c6f 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -5,6 +5,7 @@ import ( "fmt" "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/metrics" @@ -53,6 +54,37 @@ func initMonitoringResources() (err error) { return nil } +// Note: due to the OTEL specification, all histogram buckets +// Must be defined when the beholder client is created +func MetricViews() []sdkmetric.View { + return []sdkmetric.View{ + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_earlyexit_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 1, 10, 100}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_completed_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 100, 1000, 10_000, 50_000, 100_0000, 500_000}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_error_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 20, 60, 120, 240}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_step_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 20, 60, 120, 240}, + }}, + ), + } +} + // workflowsMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities // for monitoring resources type workflowsMetricLabeler struct { diff --git a/go.mod b/go.mod index e80ee7a7ba2..f532f3d9a6f 100644 --- a/go.mod +++ b/go.mod @@ -104,6 +104,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.49.0 go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/metric v1.31.0 + go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 @@ -361,7 +362,6 @@ require ( go.opentelemetry.io/otel/log v0.6.0 // indirect go.opentelemetry.io/otel/sdk v1.31.0 // indirect go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/ratelimit v0.3.0 // indirect golang.org/x/arch v0.11.0 // indirect From 79d0047f48ae138a56397262c8b74f91949a304e Mon Sep 17 00:00:00 2001 From: Patrick Date: Mon, 25 Nov 2024 12:39:28 -0500 Subject: [PATCH 6/7] fixing bug in labeler (#15406) --- core/services/workflows/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index ecb3ce60510..6ac87055731 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -1263,7 +1263,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { engine = &Engine{ cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, workflow.name)}, + metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)}, registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, From ea79473f7487b1cf72a864f0d0f855b1f495caf0 Mon Sep 17 00:00:00 2001 From: Vyzaldy Sanchez Date: Wed, 27 Nov 2024 19:11:44 -0400 Subject: [PATCH 7/7] Add beholder metrics on workflow engine (#15238) * Adds metrics on workflow engine * Adds trigger event metric * Removes comment * metrics: execution duration histograms by status and removing now redundant instrumentation * adding step execution time histogram * fixing data race for global instruments * cleanup + fixing tests * renaming vars somehow fixes broken test * removing short circuit in workferForStepRequest if Vertex call fails * nil guard if Vertex errs * updating workflow.name to workflow.hexName and fixing err log --------- Co-authored-by: patrickhuie19 --- core/services/workflows/engine.go | 84 ++++++--- core/services/workflows/models.go | 6 +- core/services/workflows/monitoring.go | 201 +++++++++++++++++---- core/services/workflows/monitoring_test.go | 5 +- 4 files changed, 229 insertions(+), 67 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 6ac87055731..0ac26ad8de4 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -142,11 +142,7 @@ func (e *Engine) Start(_ context.Context) error { // create a new context, since the one passed in via Start is short-lived. ctx, _ := e.stopCh.NewCtx() - // spin up monitoring resources - err := initMonitoringResources() - if err != nil { - return fmt.Errorf("could not initialize monitoring resources: %w", err) - } + e.metrics.incrementWorkflowInitializationCounter(ctx) e.wg.Add(e.maxWorkerLimit) for i := 0; i < e.maxWorkerLimit; i++ { @@ -358,6 +354,7 @@ func (e *Engine) init(ctx context.Context) { e.logger.Info("engine initialized") logCustMsg(ctx, e.cma, "workflow registered", e.logger) + e.metrics.incrementWorkflowRegisteredCounter(ctx) e.afterInit(true) } @@ -439,7 +436,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, WorkflowOwner: e.workflow.owner, - WorkflowName: e.workflow.name, + WorkflowName: e.workflow.hexName, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: t.Ref, @@ -678,7 +675,6 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) { func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error { l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status) - metrics := e.metrics.with("status", status) l.Info("finishing execution") @@ -692,18 +688,28 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter return err } - executionDuration := execState.FinishedAt.Sub(*execState.CreatedAt).Milliseconds() - e.stepUpdatesChMap.remove(executionID) - metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) - metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration) + + executionDuration := int64(execState.FinishedAt.Sub(*execState.CreatedAt).Seconds()) + switch status { + case store.StatusCompleted: + e.metrics.updateWorkflowCompletedDurationHistogram(ctx, executionDuration) + case store.StatusCompletedEarlyExit: + e.metrics.updateWorkflowEarlyExitDurationHistogram(ctx, executionDuration) + case store.StatusErrored: + e.metrics.updateWorkflowErrorDurationHistogram(ctx, executionDuration) + case store.StatusTimeout: + // should expect the same values unless the timeout is adjusted. + // using histogram as it gives count of executions for free + e.metrics.updateWorkflowTimeoutDurationHistogram(ctx, executionDuration) + } if executionDuration > fifteenMinutesMs { - logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l) - l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration) + logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration), l) + l.Warnf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration) } - logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l) - l.Infof("execution duration: %d", executionDuration) + logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l) + l.Infof("execution duration: %d (seconds)", executionDuration) e.onExecutionFinished(executionID) return nil } @@ -747,6 +753,7 @@ func (e *Engine) worker(ctx context.Context) { if err != nil { e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err) logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger) + e.metrics.with(platform.KeyTriggerID, te.ID).incrementTriggerWorkflowStarterErrorCounter(ctx) } else { e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started") logCustMsg(ctx, cma, "execution started", e.logger) @@ -770,10 +777,21 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { Ref: msg.stepRef, } - // TODO ks-462 inputs logCustMsg(ctx, cma, "executing step", l) + stepExecutionStartTime := time.Now() inputs, outputs, err := e.executeStep(ctx, l, msg) + stepExecutionDuration := time.Since(stepExecutionStartTime).Seconds() + + curStepID := "UNSET" + curStep, verr := e.workflow.Vertex(msg.stepRef) + if verr == nil { + curStepID = curStep.ID + } else { + l.Errorf("failed to resolve step in workflow; error %v", verr) + } + e.metrics.with(platform.KeyCapabilityID, curStepID).updateWorkflowStepDurationHistogram(ctx, int64(stepExecutionDuration)) + var stepStatus string switch { case errors.Is(capabilities.ErrStopExecution, err): @@ -850,7 +868,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name) + secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.hexName) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } @@ -894,16 +912,16 @@ func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *st // executeStep executes the referenced capability within a step and returns the result. func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRequest) (*values.Map, values.Value, error) { - step, err := e.workflow.Vertex(msg.stepRef) + curStep, err := e.workflow.Vertex(msg.stepRef) if err != nil { return nil, nil, err } var inputs any - if step.Inputs.OutputRef != "" { - inputs = step.Inputs.OutputRef + if curStep.Inputs.OutputRef != "" { + inputs = curStep.Inputs.OutputRef } else { - inputs = step.Inputs.Mapping + inputs = curStep.Inputs.Mapping } i, err := exec.FindAndInterpolateAllKeys(inputs, msg.state) @@ -916,7 +934,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe return nil, nil, err } - config, err := e.configForStep(ctx, lggr, step) + config, err := e.configForStep(ctx, lggr, curStep) if err != nil { return nil, nil, err } @@ -942,7 +960,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe WorkflowID: msg.state.WorkflowID, WorkflowExecutionID: msg.state.ExecutionID, WorkflowOwner: e.workflow.owner, - WorkflowName: e.workflow.name, + WorkflowName: e.workflow.hexName, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: msg.stepRef, @@ -952,9 +970,10 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration) defer cancel() - e.metrics.incrementCapabilityInvocationCounter(stepCtx) - output, err := step.capability.Execute(stepCtx, tr) + e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx) + output, err := curStep.capability.Execute(stepCtx, tr) if err != nil { + e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, curStep.ID).incrementCapabilityFailureCounter(ctx) return inputsMap, nil, err } @@ -967,7 +986,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr WorkflowID: e.workflow.id, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, - WorkflowName: e.workflow.name, + WorkflowName: e.workflow.hexName, WorkflowOwner: e.workflow.owner, ReferenceID: t.Ref, }, @@ -1074,6 +1093,7 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf return workflowProcessed, store.StatusCompleted, nil } +// heartbeat runs by default every defaultHeartbeatCadence minutes func (e *Engine) heartbeat(ctx context.Context) { defer e.wg.Done() @@ -1087,6 +1107,7 @@ func (e *Engine) heartbeat(ctx context.Context) { return case <-ticker.C: e.metrics.incrementEngineHeartbeatCounter(ctx) + e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) } } @@ -1153,6 +1174,7 @@ func (e *Engine) Close() error { return err } logCustMsg(ctx, e.cma, "workflow unregistered", e.logger) + e.metrics.incrementWorkflowUnregisteredCounter(ctx) return nil }) } @@ -1249,6 +1271,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { // - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist) // - etc. + // spin up monitoring resources + em, err := initMonitoringResources() + if err != nil { + return nil, fmt.Errorf("could not initialize monitoring resources: %w", err) + } + cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName) workflow, err := Parse(cfg.Workflow) if err != nil { @@ -1258,12 +1286,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { workflow.id = cfg.WorkflowID workflow.owner = cfg.WorkflowOwner - workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName)) + workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName)) engine = &Engine{ cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)}, + metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName), *em}, registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index 0faf66d9883..e5d26a474f6 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -20,9 +20,9 @@ import ( // treated differently due to their nature of being the starting // point of a workflow. type workflow struct { - id string - owner string - name string + id string + owner string + hexName string graph.Graph[string, *step] triggers []*triggerCapability diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index f4e993d8c6f..8457dadeb60 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -10,48 +10,130 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/metrics" - localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring" + monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" ) -var registerTriggerFailureCounter metric.Int64Counter -var workflowsRunningGauge metric.Int64Gauge -var capabilityInvocationCounter metric.Int64Counter -var workflowExecutionLatencyGauge metric.Int64Gauge // ms -var workflowStepErrorCounter metric.Int64Counter -var engineHeartbeatCounter metric.Int64UpDownCounter +// em AKA "engine metrics" is to locally scope these instruments to avoid +// data races in testing +type engineMetrics struct { + registerTriggerFailureCounter metric.Int64Counter + triggerWorkflowStarterErrorCounter metric.Int64Counter + workflowsRunningGauge metric.Int64Gauge + capabilityInvocationCounter metric.Int64Counter + capabilityFailureCounter metric.Int64Counter + workflowRegisteredCounter metric.Int64Counter + workflowUnregisteredCounter metric.Int64Counter + workflowExecutionLatencyGauge metric.Int64Gauge // ms + workflowStepErrorCounter metric.Int64Counter + workflowInitializationCounter metric.Int64Counter + engineHeartbeatCounter metric.Int64Counter + workflowCompletedDurationSeconds metric.Int64Histogram + workflowEarlyExitDurationSeconds metric.Int64Histogram + workflowErrorDurationSeconds metric.Int64Histogram + workflowTimeoutDurationSeconds metric.Int64Histogram + workflowStepDurationSeconds metric.Int64Histogram +} + +func initMonitoringResources() (em *engineMetrics, err error) { + em = &engineMetrics{} + em.registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_registertrigger_failures") + if err != nil { + return nil, fmt.Errorf("failed to register trigger failure counter: %w", err) + } + + em.triggerWorkflowStarterErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_triggerworkflow_starter_errors") + if err != nil { + return nil, fmt.Errorf("failed to register trigger workflow starter error counter: %w", err) + } + + em.workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count") + if err != nil { + return nil, fmt.Errorf("failed to register workflows running gauge: %w", err) + } + + em.capabilityInvocationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_count") + if err != nil { + return nil, fmt.Errorf("failed to register capability invocation counter: %w", err) + } -func initMonitoringResources() (err error) { - registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_registertrigger_failures") + em.capabilityFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_failures") if err != nil { - return fmt.Errorf("failed to register trigger failure counter: %w", err) + return nil, fmt.Errorf("failed to register capability failure counter: %w", err) } - workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count") + em.workflowRegisteredCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_registered_count") if err != nil { - return fmt.Errorf("failed to register workflows running gauge: %w", err) + return nil, fmt.Errorf("failed to register workflow registered counter: %w", err) } - capabilityInvocationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_count") + em.workflowUnregisteredCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_unregistered_count") if err != nil { - return fmt.Errorf("failed to register capability invocation counter: %w", err) + return nil, fmt.Errorf("failed to register workflow unregistered counter: %w", err) } - workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_time") + em.workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge( + "platform_engine_workflow_time", + metric.WithUnit("ms")) if err != nil { - return fmt.Errorf("failed to register workflow execution latency gauge: %w", err) + return nil, fmt.Errorf("failed to register workflow execution latency gauge: %w", err) } - workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors") + em.workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations") if err != nil { - return fmt.Errorf("failed to register workflow step error counter: %w", err) + return nil, fmt.Errorf("failed to register workflow initialization counter: %w", err) } - engineHeartbeatCounter, err = beholder.GetMeter().Int64UpDownCounter("platform_engine_heartbeat") + em.workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors") if err != nil { - return fmt.Errorf("failed to register engine heartbeat counter: %w", err) + return nil, fmt.Errorf("failed to register workflow step error counter: %w", err) } - return nil + em.engineHeartbeatCounter, err = beholder.GetMeter().Int64Counter("platform_engine_heartbeat") + if err != nil { + return nil, fmt.Errorf("failed to register engine heartbeat counter: %w", err) + } + + em.workflowCompletedDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_completed_time_seconds", + metric.WithDescription("Distribution of completed execution latencies"), + metric.WithUnit("seconds")) + if err != nil { + return nil, fmt.Errorf("failed to register completed duration histogram: %w", err) + } + + em.workflowEarlyExitDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_earlyexit_time_seconds", + metric.WithDescription("Distribution of earlyexit execution latencies"), + metric.WithUnit("seconds")) + if err != nil { + return nil, fmt.Errorf("failed to register early exit duration histogram: %w", err) + } + + em.workflowErrorDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_error_time_seconds", + metric.WithDescription("Distribution of error execution latencies"), + metric.WithUnit("seconds")) + if err != nil { + return nil, fmt.Errorf("failed to register error duration histogram: %w", err) + } + + em.workflowTimeoutDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_timeout_time_seconds", + metric.WithDescription("Distribution of timeout execution latencies"), + metric.WithUnit("seconds")) + if err != nil { + return nil, fmt.Errorf("failed to register timeout duration histogram: %w", err) + } + + em.workflowStepDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_step_time_seconds", + metric.WithDescription("Distribution of step execution times"), + metric.WithUnit("seconds")) + if err != nil { + return nil, fmt.Errorf("failed to register step execution time histogram: %w", err) + } + + return em, nil } // Note: due to the OTEL specification, all histogram buckets @@ -89,38 +171,89 @@ func MetricViews() []sdkmetric.View { // for monitoring resources type workflowsMetricLabeler struct { metrics.Labeler + em engineMetrics } func (c workflowsMetricLabeler) with(keyValues ...string) workflowsMetricLabeler { - return workflowsMetricLabeler{c.With(keyValues...)} + return workflowsMetricLabeler{c.With(keyValues...), c.em} } func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.triggerWorkflowStarterErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.capabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowRegisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowUnregisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowCompletedDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowEarlyExitDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowErrorDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowTimeoutDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowStepDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowStepDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } diff --git a/core/services/workflows/monitoring_test.go b/core/services/workflows/monitoring_test.go index 5910e583c95..5b7177e51dc 100644 --- a/core/services/workflows/monitoring_test.go +++ b/core/services/workflows/monitoring_test.go @@ -9,11 +9,12 @@ import ( ) func Test_InitMonitoringResources(t *testing.T) { - require.NoError(t, initMonitoringResources()) + _, err := initMonitoringResources() + require.NoError(t, err) } func Test_WorkflowMetricsLabeler(t *testing.T) { - testWorkflowsMetricLabeler := workflowsMetricLabeler{metrics.NewLabeler()} + testWorkflowsMetricLabeler := workflowsMetricLabeler{metrics.NewLabeler(), engineMetrics{}} testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") require.EqualValues(t, testWorkflowsMetricLabeler2.Labels["foo"], "baz") }