From 6c7f5ce123d6de9aeddbcb014a919ddf7f3c8cb2 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 15 Jan 2025 13:51:31 +0800 Subject: [PATCH] feat: add sync cache metrics (#3510) Signed-off-by: Song Gao --- internal/server/promMetrics/metrcis.go | 7 +++- internal/topo/node/cache/sync_cache.go | 27 ++++++++++++++++ metrics/sync_cache.go | 45 ++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 metrics/sync_cache.go diff --git a/internal/server/promMetrics/metrcis.go b/internal/server/promMetrics/metrcis.go index 2efeb5ee65..8b7dd7a7aa 100644 --- a/internal/server/promMetrics/metrcis.go +++ b/internal/server/promMetrics/metrcis.go @@ -14,7 +14,11 @@ package promMetrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/lf-edge/ekuiper/metrics" +) const ( LblStatusType = "status" @@ -47,6 +51,7 @@ func InitServerMetrics() { func RegisterMetrics() { InitServerMetrics() + metrics.RegisterSyncCacheMetrics() prometheus.MustRegister(RuleStatusCountGauge) prometheus.MustRegister(RuleStatusGauge) } diff --git a/internal/topo/node/cache/sync_cache.go b/internal/topo/node/cache/sync_cache.go index 3e2d0063d7..fd40d57b6a 100644 --- a/internal/topo/node/cache/sync_cache.go +++ b/internal/topo/node/cache/sync_cache.go @@ -21,11 +21,21 @@ import ( "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/pkg/store" + "github.com/lf-edge/ekuiper/metrics" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/infra" "github.com/lf-edge/ekuiper/pkg/kv" ) +const ( + addLbl = "add" + sendLbl = "send" + delLbl = "del" + ackLbl = "ack" + loadLbl = "load" + flushLbl = "flush" +) + // page Rotates storage for in memory cache // Not thread safe! type page struct { @@ -92,6 +102,8 @@ func (p *page) reset() { } type SyncCache struct { + ruleID string + opID string // The input data to the cache in <-chan []map[string]interface{} Out chan []map[string]interface{} @@ -119,6 +131,8 @@ type SyncCache struct { func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache { c := NewSyncCache(ctx, in, errCh, cacheConf, bufferLength) c.exitCh = exitCh + c.ruleID = ctx.GetRuleId() + c.opID = ctx.GetOpId() return c } @@ -171,6 +185,7 @@ func (c *SyncCache) run(ctx api.StreamContext) { c.send(ctx) } case isSuccess := <-c.Ack: + metrics.SyncCacheOpCnter.WithLabelValues(ackLbl, c.ruleID, c.opID).Inc() // only send the next sink after receiving an ack ctx.GetLogger().Debugf("cache ack") if isSuccess { @@ -194,6 +209,7 @@ func (c *SyncCache) run(ctx api.StreamContext) { } func (c *SyncCache) send(ctx api.StreamContext) { + metrics.SyncCacheOpCnter.WithLabelValues(sendLbl, c.ruleID, c.opID).Inc() if c.CacheLength > 1 && c.cacheConf.ResendInterval > 0 { time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond) } @@ -215,6 +231,7 @@ func (c *SyncCache) send(ctx api.StreamContext) { // addCache not thread safe! func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) { + metrics.SyncCacheOpCnter.WithLabelValues(addLbl, c.ruleID, c.opID).Inc() isNotFull := c.appendMemCache(item) if !isNotFull { if c.diskBufferPage == nil { @@ -227,6 +244,10 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{ c.loadFromDisk(ctx) ctx.GetLogger().Debug("disk full, remove the last page") } + start := time.Now() + defer func() { + metrics.SyncCacheDurationHist.WithLabelValues(flushLbl, c.ruleID, c.opID).Observe(float64(time.Since(start).Microseconds())) + }() err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage) if err != nil { ctx.GetLogger().Errorf("fail to store disk cache %v", err) @@ -258,6 +279,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{ // deleteCache not thread safe! func (c *SyncCache) deleteCache(ctx api.StreamContext) { + metrics.SyncCacheOpCnter.WithLabelValues(delLbl, c.ruleID, c.opID).Inc() ctx.GetLogger().Debugf("deleting cache. CacheLength: %d, diskSize: %d", c.CacheLength, c.diskSize) if len(c.memCache) == 0 { ctx.GetLogger().Debug("mem cache is empty") @@ -282,6 +304,11 @@ func (c *SyncCache) deleteCache(ctx api.StreamContext) { } func (c *SyncCache) loadFromDisk(ctx api.StreamContext) { + metrics.SyncCacheOpCnter.WithLabelValues(loadLbl, c.ruleID, c.opID).Inc() + start := time.Now() + defer func() { + metrics.SyncCacheDurationHist.WithLabelValues(loadLbl, c.ruleID, c.opID).Observe(float64(time.Since(start).Microseconds())) + }() // load page from the disk ctx.GetLogger().Debugf("loading from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize) hotPage := newPage(c.cacheConf.BufferPageSize) diff --git a/metrics/sync_cache.go b/metrics/sync_cache.go new file mode 100644 index 0000000000..1b2999bb9a --- /dev/null +++ b/metrics/sync_cache.go @@ -0,0 +1,45 @@ +// Copyright 2025 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const ( + LblType = "type" + LblRuleIDType = "rule" + LblOpIDType = "op" +) + +var ( + SyncCacheDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "kuiper", + Subsystem: "sync_cache", + Name: "duration", + Buckets: prometheus.ExponentialBuckets(10, 2, 20), // 10us ~ 5s + Help: "hist of sync cache", + }, []string{LblType, LblRuleIDType, LblOpIDType}) + + SyncCacheOpCnter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "kuiper", + Subsystem: "sync_cache", + Name: "counter", + Help: "counter of sync cache", + }, []string{LblType, LblRuleIDType, LblOpIDType}) +) + +func RegisterSyncCacheMetrics() { + prometheus.MustRegister(SyncCacheOpCnter) + prometheus.MustRegister(SyncCacheDurationHist) +}