From f102fa8965a3ba721c90127499cb994365082398 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 14 Feb 2025 10:32:14 +0800 Subject: [PATCH] fix: Add and use lifetime context for compaction trigger (#39857) Related to #39856 This PR add lifetime bound context for compaction trigger and use it instead of context.Background in case of rootcoord down and some grpc call retry forever --------- Signed-off-by: Congqi Xia --- .../datacoord/compaction_policy_clustering.go | 3 +- .../compaction_policy_clustering_test.go | 4 +-- .../datacoord/compaction_policy_single.go | 3 +- .../compaction_policy_single_test.go | 2 +- internal/datacoord/compaction_trigger_v2.go | 32 ++++++++++--------- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index fd0b0295ad06b..7ddca486bae6d 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -49,9 +49,8 @@ func (policy *clusteringCompactionPolicy) Enable() bool { Params.DataCoordCfg.ClusteringCompactionAutoEnable.GetAsBool() } -func (policy *clusteringCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { +func (policy *clusteringCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) { log.Info("start trigger clusteringCompactionPolicy...") - ctx := context.Background() collections := policy.meta.GetCollections() events := make(map[CompactionTriggerType][]CompactionView, 0) diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index c43c69afb990a..5617c0d5cbb8b 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -99,7 +99,7 @@ func (s *ClusteringCompactionPolicySuite) TestEnable() { func (s *ClusteringCompactionPolicySuite) TestTriggerWithNoCollecitons() { // trigger with no collections - events, err := s.clusteringCompactionPolicy.Trigger() + events, err := s.clusteringCompactionPolicy.Trigger(context.Background()) s.NoError(err) gotViews, ok := events[TriggerTypeClustering] s.True(ok) @@ -132,7 +132,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerWithCollections() { }) // trigger - events, err := s.clusteringCompactionPolicy.Trigger() + events, err := s.clusteringCompactionPolicy.Trigger(context.Background()) s.NoError(err) gotViews, ok := events[TriggerTypeClustering] s.True(ok) diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index edf1c4dbc2af0..a7b8549f31adb 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -46,8 +46,7 @@ func (policy *singleCompactionPolicy) Enable() bool { return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() } -func (policy *singleCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { - ctx := context.Background() +func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) { collections := policy.meta.GetCollections() events := make(map[CompactionTriggerType][]CompactionView, 0) diff --git a/internal/datacoord/compaction_policy_single_test.go b/internal/datacoord/compaction_policy_single_test.go index 064ef6afb8049..bc1aad4abc4ff 100644 --- a/internal/datacoord/compaction_policy_single_test.go +++ b/internal/datacoord/compaction_policy_single_test.go @@ -65,7 +65,7 @@ func (s *SingleCompactionPolicySuite) SetupTest() { } func (s *SingleCompactionPolicySuite) TestTrigger() { - events, err := s.singlePolicy.Trigger() + events, err := s.singlePolicy.Trigger(context.Background()) s.NoError(err) gotViews, ok := events[TriggerTypeSingle] s.True(ok) diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 7bed1f5c9d209..1a48dac610afc 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -86,8 +86,8 @@ type CompactionTriggerManager struct { clusteringPolicy *clusteringCompactionPolicy singlePolicy *singleCompactionPolicy - closeSig chan struct{} - closeWg sync.WaitGroup + cancel context.CancelFunc + closeWg sync.WaitGroup } func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta) *CompactionTriggerManager { @@ -96,7 +96,6 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com handler: handler, compactionHandler: compactionHandler, meta: meta, - closeSig: make(chan struct{}), } m.l0Policy = newL0CompactionPolicy(meta) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler) @@ -112,19 +111,25 @@ func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) { func (m *CompactionTriggerManager) Start() { m.closeWg.Add(1) - go m.startLoop() + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + go func() { + defer m.closeWg.Done() + m.loop(ctx) + }() } func (m *CompactionTriggerManager) Stop() { - close(m.closeSig) + if m.cancel != nil { + m.cancel() + } m.closeWg.Wait() } -func (m *CompactionTriggerManager) startLoop() { +func (m *CompactionTriggerManager) loop(ctx context.Context) { defer logutil.LogPanic() - defer m.closeWg.Done() - log := log.Ctx(context.TODO()) + log := log.Ctx(ctx) l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second)) defer l0Ticker.Stop() clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) @@ -134,7 +139,7 @@ func (m *CompactionTriggerManager) startLoop() { log.Info("Compaction trigger manager start") for { select { - case <-m.closeSig: + case <-ctx.Done(): log.Info("Compaction trigger manager checkLoop quit") return case <-l0Ticker.C: @@ -150,7 +155,6 @@ func (m *CompactionTriggerManager) startLoop() { log.Warn("Fail to trigger L0 policy", zap.Error(err)) continue } - ctx := context.Background() if len(events) > 0 { for triggerType, views := range events { m.notify(ctx, triggerType, views) @@ -164,12 +168,11 @@ func (m *CompactionTriggerManager) startLoop() { log.RatedInfo(10, "Skip trigger clustering compaction since compactionHandler is full") continue } - events, err := m.clusteringPolicy.Trigger() + events, err := m.clusteringPolicy.Trigger(ctx) if err != nil { log.Warn("Fail to trigger clustering policy", zap.Error(err)) continue } - ctx := context.Background() if len(events) > 0 { for triggerType, views := range events { m.notify(ctx, triggerType, views) @@ -183,12 +186,11 @@ func (m *CompactionTriggerManager) startLoop() { log.RatedInfo(10, "Skip trigger single compaction since compactionHandler is full") continue } - events, err := m.singlePolicy.Trigger() + events, err := m.singlePolicy.Trigger(ctx) if err != nil { log.Warn("Fail to trigger single policy", zap.Error(err)) continue } - ctx := context.Background() if len(events) > 0 { for triggerType, views := range events { m.notify(ctx, triggerType, views) @@ -200,7 +202,7 @@ func (m *CompactionTriggerManager) startLoop() { func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) { log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID)) - views, triggerID, err := m.clusteringPolicy.triggerOneCollection(context.Background(), collectionID, true) + views, triggerID, err := m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true) if err != nil { return 0, err }