diff --git a/.changeset/tricky-candles-matter.md b/.changeset/tricky-candles-matter.md new file mode 100644 index 0000000000..0dc7806703 --- /dev/null +++ b/.changeset/tricky-candles-matter.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix Memory leak fix on promwrapper diff --git a/core/services/ocr2/plugins/promwrapper/plugin.go b/core/services/ocr2/plugins/promwrapper/plugin.go index cc6c9d135d..aa60ab8800 100644 --- a/core/services/ocr2/plugins/promwrapper/plugin.go +++ b/core/services/ocr2/plugins/promwrapper/plugin.go @@ -7,15 +7,23 @@ import ( "context" "fmt" "math/big" - "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/patrickmn/go-cache" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) +const ( + // defaultExpiration is the default expiration time for cache items. + defaultExpiration = 30 * time.Minute + + // defaultCleanupInterval is the default interval for cache cleanup. + defaultCleanupInterval = 5 * time.Minute +) + // Type assertions, buckets and labels. var ( _ types.ReportingPlugin = &promPlugin{} @@ -160,10 +168,10 @@ type ( chainID *big.Int oracleID string configDigest string - queryEndTimes sync.Map - observationEndTimes sync.Map - reportEndTimes sync.Map - acceptFinalizedReportEndTimes sync.Map + queryEndTimes *cache.Cache + observationEndTimes *cache.Cache + reportEndTimes *cache.Cache + acceptFinalizedReportEndTimes *cache.Cache prometheusBackend PrometheusBackend } ) @@ -223,13 +231,17 @@ func New( } return &promPlugin{ - wrapped: plugin, - name: name, - chainType: chainType, - chainID: chainID, - oracleID: fmt.Sprintf("%d", config.OracleID), - configDigest: common.Bytes2Hex(config.ConfigDigest[:]), - prometheusBackend: prometheusBackend, + wrapped: plugin, + name: name, + chainType: chainType, + chainID: chainID, + oracleID: fmt.Sprintf("%d", config.OracleID), + configDigest: common.Bytes2Hex(config.ConfigDigest[:]), + prometheusBackend: prometheusBackend, + queryEndTimes: cache.New(defaultExpiration, defaultCleanupInterval), + observationEndTimes: cache.New(defaultExpiration, defaultCleanupInterval), + reportEndTimes: cache.New(defaultExpiration, defaultCleanupInterval), + acceptFinalizedReportEndTimes: cache.New(defaultExpiration, defaultCleanupInterval), } } @@ -238,7 +250,7 @@ func (p *promPlugin) Query(ctx context.Context, timestamp types.ReportTimestamp) defer func() { duration := float64(time.Now().UTC().Sub(start)) p.prometheusBackend.SetQueryDuration(getLabelsValues(p, timestamp), duration) - p.queryEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Query() + p.setEndTime(timestamp, p.queryEndTimes) // note time at end of Query() }() return p.wrapped.Query(ctx, timestamp) @@ -249,17 +261,16 @@ func (p *promPlugin) Observation(ctx context.Context, timestamp types.ReportTime // Report latency between Query() and Observation(). labelValues := getLabelsValues(p, timestamp) - if queryEndTime, ok := p.queryEndTimes.Load(timestamp); ok { + if queryEndTime, ok := p.queryEndTimes.Get(timestampToKey(timestamp)); ok { latency := float64(start.Sub(queryEndTime.(time.Time))) p.prometheusBackend.SetQueryToObservationLatency(labelValues, latency) - p.queryEndTimes.Delete(timestamp) } // Report latency for Observation() at end of call. defer func() { duration := float64(time.Now().UTC().Sub(start)) p.prometheusBackend.SetObservationDuration(labelValues, duration) - p.observationEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Observe() + p.setEndTime(timestamp, p.observationEndTimes) // note time at end of Observe() }() return p.wrapped.Observation(ctx, timestamp, query) @@ -270,17 +281,16 @@ func (p *promPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp // Report latency between Observation() and Report(). labelValues := getLabelsValues(p, timestamp) - if observationEndTime, ok := p.observationEndTimes.Load(timestamp); ok { + if observationEndTime, ok := p.observationEndTimes.Get(timestampToKey(timestamp)); ok { latency := float64(start.Sub(observationEndTime.(time.Time))) p.prometheusBackend.SetObservationToReportLatency(labelValues, latency) - p.observationEndTimes.Delete(timestamp) } // Report latency for Report() at end of call. defer func() { duration := float64(time.Now().UTC().Sub(start)) p.prometheusBackend.SetReportDuration(labelValues, duration) - p.reportEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Report() + p.setEndTime(timestamp, p.reportEndTimes) // note time at end of Report() }() return p.wrapped.Report(ctx, timestamp, query, observations) @@ -291,17 +301,16 @@ func (p *promPlugin) ShouldAcceptFinalizedReport(ctx context.Context, timestamp // Report latency between Report() and ShouldAcceptFinalizedReport(). labelValues := getLabelsValues(p, timestamp) - if reportEndTime, ok := p.reportEndTimes.Load(timestamp); ok { + if reportEndTime, ok := p.reportEndTimes.Get(timestampToKey(timestamp)); ok { latency := float64(start.Sub(reportEndTime.(time.Time))) p.prometheusBackend.SetReportToAcceptFinalizedReportLatency(labelValues, latency) - p.reportEndTimes.Delete(timestamp) } // Report latency for ShouldAcceptFinalizedReport() at end of call. defer func() { duration := float64(time.Now().UTC().Sub(start)) p.prometheusBackend.SetShouldAcceptFinalizedReportDuration(labelValues, duration) - p.acceptFinalizedReportEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of ShouldAcceptFinalizedReport() + p.setEndTime(timestamp, p.acceptFinalizedReportEndTimes) // note time at end of ShouldAcceptFinalizedReport() }() return p.wrapped.ShouldAcceptFinalizedReport(ctx, timestamp, report) @@ -312,10 +321,9 @@ func (p *promPlugin) ShouldTransmitAcceptedReport(ctx context.Context, timestamp // Report latency between ShouldAcceptFinalizedReport() and ShouldTransmitAcceptedReport(). labelValues := getLabelsValues(p, timestamp) - if acceptFinalizedReportEndTime, ok := p.acceptFinalizedReportEndTimes.Load(timestamp); ok { + if acceptFinalizedReportEndTime, ok := p.acceptFinalizedReportEndTimes.Get(timestampToKey(timestamp)); ok { latency := float64(start.Sub(acceptFinalizedReportEndTime.(time.Time))) p.prometheusBackend.SetAcceptFinalizedReportToTransmitAcceptedReportLatency(labelValues, latency) - p.acceptFinalizedReportEndTimes.Delete(timestamp) } defer func() { @@ -343,3 +351,11 @@ func (p *promPlugin) Close() error { return p.wrapped.Close() } + +func (p *promPlugin) setEndTime(timestamp types.ReportTimestamp, cache *cache.Cache) { + cache.SetDefault(timestampToKey(timestamp), time.Now().UTC()) +} + +func timestampToKey(timestamp types.ReportTimestamp) string { + return fmt.Sprintf("%x_%d_%d", timestamp.ConfigDigest[:], timestamp.Epoch, timestamp.Round) +} diff --git a/core/services/ocr2/plugins/promwrapper/plugin_test.go b/core/services/ocr2/plugins/promwrapper/plugin_test.go index b4de7f027f..5b8187405f 100644 --- a/core/services/ocr2/plugins/promwrapper/plugin_test.go +++ b/core/services/ocr2/plugins/promwrapper/plugin_test.go @@ -69,12 +69,12 @@ func TestPlugin_MustInstantiate(t *testing.T) { // Ensure instantiation without panic for no override backend. var reportingPlugin = &fakeReportingPlugin{} promPlugin := New(reportingPlugin, "test", "EVM", big.NewInt(1), types.ReportingPluginConfig{}, nil) - require.NotEqual(t, nil, promPlugin) + require.NotNil(t, promPlugin) // Ensure instantiation without panic for override provided. backend := mocks.NewPrometheusBackend(t) promPlugin = New(reportingPlugin, "test-2", "EVM", big.NewInt(1), types.ReportingPluginConfig{}, backend) - require.NotEqual(t, nil, promPlugin) + require.NotNil(t, promPlugin) } func TestPlugin_GetLatencies(t *testing.T) { @@ -194,45 +194,37 @@ func TestPlugin_GetLatencies(t *testing.T) { types.ReportingPluginConfig{ConfigDigest: reportTimestamp.ConfigDigest}, backend, ).(*promPlugin) - require.NotEqual(t, nil, promPlugin) + require.NotNil(t, promPlugin) ctx := testutils.Context(t) // Run OCR methods. _, err := promPlugin.Query(ctx, reportTimestamp) require.NoError(t, err) - _, ok := promPlugin.queryEndTimes.Load(reportTimestamp) - require.Equal(t, true, ok) + _, ok := promPlugin.queryEndTimes.Get(timestampToKey(reportTimestamp)) + require.True(t, ok) time.Sleep(qToOLatency) _, err = promPlugin.Observation(ctx, reportTimestamp, nil) require.NoError(t, err) - _, ok = promPlugin.queryEndTimes.Load(reportTimestamp) - require.Equal(t, false, ok) - _, ok = promPlugin.observationEndTimes.Load(reportTimestamp) - require.Equal(t, true, ok) + _, ok = promPlugin.observationEndTimes.Get(timestampToKey(reportTimestamp)) + require.True(t, ok) time.Sleep(oToRLatency) _, _, err = promPlugin.Report(ctx, reportTimestamp, nil, nil) require.NoError(t, err) - _, ok = promPlugin.observationEndTimes.Load(reportTimestamp) - require.Equal(t, false, ok) - _, ok = promPlugin.reportEndTimes.Load(reportTimestamp) - require.Equal(t, true, ok) + _, ok = promPlugin.reportEndTimes.Get(timestampToKey(reportTimestamp)) + require.True(t, ok) time.Sleep(rToALatency) _, err = promPlugin.ShouldAcceptFinalizedReport(ctx, reportTimestamp, nil) require.NoError(t, err) - _, ok = promPlugin.reportEndTimes.Load(reportTimestamp) - require.Equal(t, false, ok) - _, ok = promPlugin.acceptFinalizedReportEndTimes.Load(reportTimestamp) - require.Equal(t, true, ok) + _, ok = promPlugin.acceptFinalizedReportEndTimes.Get(timestampToKey(reportTimestamp)) + require.True(t, ok) time.Sleep(aToTLatency) _, err = promPlugin.ShouldTransmitAcceptedReport(ctx, reportTimestamp, nil) require.NoError(t, err) - _, ok = promPlugin.acceptFinalizedReportEndTimes.Load(reportTimestamp) - require.Equal(t, false, ok) // Close. err = promPlugin.Close()