Skip to content

Commit

Permalink
feat: add sync cache metrics (#3510)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Jan 15, 2025
1 parent b9b5af5 commit 6c7f5ce
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
7 changes: 6 additions & 1 deletion internal/server/promMetrics/metrcis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package promMetrics

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"

"github.com/lf-edge/ekuiper/metrics"
)

const (
LblStatusType = "status"
Expand Down Expand Up @@ -47,6 +51,7 @@ func InitServerMetrics() {

func RegisterMetrics() {
InitServerMetrics()
metrics.RegisterSyncCacheMetrics()
prometheus.MustRegister(RuleStatusCountGauge)
prometheus.MustRegister(RuleStatusGauge)
}
Expand Down
27 changes: 27 additions & 0 deletions internal/topo/node/cache/sync_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ import (

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/metrics"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/infra"
"github.com/lf-edge/ekuiper/pkg/kv"
)

const (
addLbl = "add"
sendLbl = "send"
delLbl = "del"
ackLbl = "ack"
loadLbl = "load"
flushLbl = "flush"
)

// page Rotates storage for in memory cache
// Not thread safe!
type page struct {
Expand Down Expand Up @@ -92,6 +102,8 @@ func (p *page) reset() {
}

type SyncCache struct {
ruleID string
opID string
// The input data to the cache
in <-chan []map[string]interface{}
Out chan []map[string]interface{}
Expand Down Expand Up @@ -119,6 +131,8 @@ type SyncCache struct {
func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
c := NewSyncCache(ctx, in, errCh, cacheConf, bufferLength)
c.exitCh = exitCh
c.ruleID = ctx.GetRuleId()
c.opID = ctx.GetOpId()
return c
}

Expand Down Expand Up @@ -171,6 +185,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
c.send(ctx)
}
case isSuccess := <-c.Ack:
metrics.SyncCacheOpCnter.WithLabelValues(ackLbl, c.ruleID, c.opID).Inc()
// only send the next sink after receiving an ack
ctx.GetLogger().Debugf("cache ack")
if isSuccess {
Expand All @@ -194,6 +209,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
}

func (c *SyncCache) send(ctx api.StreamContext) {
metrics.SyncCacheOpCnter.WithLabelValues(sendLbl, c.ruleID, c.opID).Inc()
if c.CacheLength > 1 && c.cacheConf.ResendInterval > 0 {
time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
}
Expand All @@ -215,6 +231,7 @@ func (c *SyncCache) send(ctx api.StreamContext) {

// addCache not thread safe!
func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
metrics.SyncCacheOpCnter.WithLabelValues(addLbl, c.ruleID, c.opID).Inc()
isNotFull := c.appendMemCache(item)
if !isNotFull {
if c.diskBufferPage == nil {
Expand All @@ -227,6 +244,10 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
c.loadFromDisk(ctx)
ctx.GetLogger().Debug("disk full, remove the last page")
}
start := time.Now()
defer func() {
metrics.SyncCacheDurationHist.WithLabelValues(flushLbl, c.ruleID, c.opID).Observe(float64(time.Since(start).Microseconds()))
}()
err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
if err != nil {
ctx.GetLogger().Errorf("fail to store disk cache %v", err)
Expand Down Expand Up @@ -258,6 +279,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{

// deleteCache not thread safe!
func (c *SyncCache) deleteCache(ctx api.StreamContext) {
metrics.SyncCacheOpCnter.WithLabelValues(delLbl, c.ruleID, c.opID).Inc()
ctx.GetLogger().Debugf("deleting cache. CacheLength: %d, diskSize: %d", c.CacheLength, c.diskSize)
if len(c.memCache) == 0 {
ctx.GetLogger().Debug("mem cache is empty")
Expand All @@ -282,6 +304,11 @@ func (c *SyncCache) deleteCache(ctx api.StreamContext) {
}

func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
metrics.SyncCacheOpCnter.WithLabelValues(loadLbl, c.ruleID, c.opID).Inc()
start := time.Now()
defer func() {
metrics.SyncCacheDurationHist.WithLabelValues(loadLbl, c.ruleID, c.opID).Observe(float64(time.Since(start).Microseconds()))
}()
// load page from the disk
ctx.GetLogger().Debugf("loading from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize)
hotPage := newPage(c.cacheConf.BufferPageSize)
Expand Down
45 changes: 45 additions & 0 deletions metrics/sync_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import "github.com/prometheus/client_golang/prometheus"

const (
LblType = "type"
LblRuleIDType = "rule"
LblOpIDType = "op"
)

var (
SyncCacheDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "kuiper",
Subsystem: "sync_cache",
Name: "duration",
Buckets: prometheus.ExponentialBuckets(10, 2, 20), // 10us ~ 5s
Help: "hist of sync cache",
}, []string{LblType, LblRuleIDType, LblOpIDType})

SyncCacheOpCnter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "kuiper",
Subsystem: "sync_cache",
Name: "counter",
Help: "counter of sync cache",
}, []string{LblType, LblRuleIDType, LblOpIDType})
)

func RegisterSyncCacheMetrics() {
prometheus.MustRegister(SyncCacheOpCnter)
prometheus.MustRegister(SyncCacheDurationHist)
}

0 comments on commit 6c7f5ce

Please sign in to comment.