diff --git a/config/config.go b/config/config.go index d4d92ac5..d0443f75 100644 --- a/config/config.go +++ b/config/config.go @@ -39,6 +39,7 @@ type Config struct { Assignment Assignment LogLevel string LogTrace bool + RecordPoolSize int64 ReloadSeriesMapInterval int ActiveSeriesRange int @@ -340,6 +341,9 @@ func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Cr } } } + if cfg.RecordPoolSize == 0 { + cfg.RecordPoolSize = MaxBufferSize + } switch strings.ToLower(cfg.LogLevel) { case "debug", "info", "warn", "error", "dpanic", "panic", "fatal": default: diff --git a/docs/configuration/config.md b/docs/configuration/config.md index b624cc71..22c1c735 100644 --- a/docs/configuration/config.md +++ b/docs/configuration/config.md @@ -199,6 +199,9 @@ // utilization, only active series from the last "activeSeriesRange" seconds will be cached, and the map in the cache will be updated every // "reloadSeriesMapInterval" seconds. By default, series from the last 24 hours will be cached, and the cache will be updated every hour. "reloadSeriesMapInterval": 3600, - "activeSeriesRange": 86400 + "activeSeriesRange": 86400, + "logTrace": false, + // It is recommended that recordPoolSize be 3 or 4 times the bufferSize, for the backpressure mechanism, to avoid using too much memory. + "recordPoolSize": 1048576 } ``` diff --git a/input/kafka_franz.go b/input/kafka_franz.go index 8841c683..cc5f6740 100644 --- a/input/kafka_franz.go +++ b/input/kafka_franz.go @@ -19,7 +19,6 @@ import ( "context" "crypto/tls" "fmt" - "strconv" "strings" "sync" "time" @@ -181,6 +180,9 @@ func (k *KafkaFranz) Run() { defer k.wgRun.Done() LOOP: for { + if !util.Rs.Allow() { + continue + } traceId := util.GenTraceId() util.LogTrace(traceId, util.TraceKindFetchStart, zap.String("consumer group", k.grpConfig.Name), zap.Int("buffersize", k.grpConfig.BufferSize)) fetches := k.cl.PollRecords(k.ctx, k.grpConfig.BufferSize) @@ -192,7 +194,9 @@ LOOP: err = errors.Wrapf(err, "") util.Logger.Info("kgo.Client.PollFetchs() got an error", zap.Error(err)) } - util.LogTrace(traceId, util.TraceKindFetchEnd, zap.String("consumer group", k.grpConfig.Name), zap.String("records", strconv.Itoa(fetches.NumRecords()))) + fetchRecords := fetches.NumRecords() + util.Rs.Inc(int64(fetchRecords)) + util.LogTrace(traceId, util.TraceKindFetchEnd, zap.String("consumer group", k.grpConfig.Name), zap.Int64("records", int64(fetchRecords))) // Automatically end the program if it remains inactive for a specific duration of time. t := time.NewTimer(processTimeOut * time.Minute) select { diff --git a/output/clickhouse.go b/output/clickhouse.go index e20d7afa..d246c8d3 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -142,6 +142,7 @@ func (c *ClickHouse) Send(batch *model.Batch, traceId string) { statistics.WritingPoolBacklog.WithLabelValues(c.taskCfg.Name).Dec() }); err != nil { batch.Wg.Done() + util.Rs.Dec(int64(batch.RealSize)) return } @@ -256,6 +257,7 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, sc *pool.ShardConn, traceId s util.LogTrace(traceId, util.TraceKindWriteStart, zap.Int("realsize", batch.RealSize)) defer func() { + util.Rs.Dec(int64(batch.RealSize)) util.LogTrace(traceId, util.TraceKindWriteEnd, zap.Int("success", batch.RealSize)) }() times := c.cfg.Clickhouse.RetryTimes diff --git a/task/consumer.go b/task/consumer.go index de798dfc..259dd117 100644 --- a/task/consumer.go +++ b/task/consumer.go @@ -245,6 +245,8 @@ func (c *Consumer) processFetch() { if e := tsk.Put(msg, traceId, flushFn); e != nil { atomic.StoreInt64(&done, items) err = e + // decrise the error record + util.Rs.Dec(1) return false } } diff --git a/task/sinker.go b/task/sinker.go index 0682b9b1..e2b76db1 100644 --- a/task/sinker.go +++ b/task/sinker.go @@ -283,6 +283,8 @@ func (s *Sinker) stopAllTasks() { func (s *Sinker) applyConfig(newCfg *config.Config) (err error) { util.SetLogLevel(newCfg.LogLevel) util.SetLogTrace(newCfg.LogTrace) + util.Rs.SetPoolSize(newCfg.RecordPoolSize) + util.Rs.Reset() if s.curCfg == nil { // The first time invoking of applyConfig err = s.applyFirstConfig(newCfg) diff --git a/util/recordpoolsize.go b/util/recordpoolsize.go new file mode 100644 index 00000000..487b70b5 --- /dev/null +++ b/util/recordpoolsize.go @@ -0,0 +1,35 @@ +package util + +import "sync/atomic" + +type RecordSize struct { + poolSize int64 + realSize int64 +} + +func (rs *RecordSize) SetPoolSize(size int64) { + rs.poolSize = size +} + +func (rs *RecordSize) Inc(size int64) { + atomic.AddInt64(&rs.realSize, size) +} + +func (rs *RecordSize) Reset() { + atomic.StoreInt64(&rs.realSize, 0) +} + +func (rs *RecordSize) Dec(size int64) { + atomic.AddInt64(&rs.realSize, size*(-1)) +} + +func (rs *RecordSize) Get() int64 { + return atomic.LoadInt64(&rs.realSize) +} + +func (rs *RecordSize) Allow() bool { + realSize := atomic.LoadInt64(&rs.realSize) + return realSize < rs.poolSize +} + +var Rs RecordSize