From 016f8039083998473b3b8c35a2feefa8222cd9c8 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Tue, 11 Jun 2024 13:48:28 +0800 Subject: [PATCH] feat: do not restart consumer when foundNewKeys to avoid rebalance --- task/task.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index a0865046..1ff8f349 100644 --- a/task/task.go +++ b/task/task.go @@ -190,8 +190,10 @@ func (service *Service) Put(msg *model.InputMessage, traceId string, flushFn fun // 2) flush the shards // 3) apply the schema change. // 4) recreate the service - util.Logger.Warn("new key detected, consumer is going to restart", zap.String("consumer group", service.taskCfg.ConsumerGroup), zap.Error(err)) - go service.consumer.restart() + if len(service.consumer.grpConfig.Configs) > 1 { + util.Logger.Warn("new key detected, consumer is going to restart", zap.String("consumer group", service.taskCfg.ConsumerGroup), zap.Error(err)) + go service.consumer.restart() + } flushFn(traceId, "foundNewKeys and restart") if err = service.clickhouse.ChangeSchema(&service.newKeys); err != nil { util.Logger.Fatal("clickhouse.ChangeSchema failed", zap.String("task", taskCfg.Name), zap.Error(err))