Skip to content

Commit

Permalink
fix: Add and use lifetime context for compaction trigger (#39857)
Browse files Browse the repository at this point in the history
Related to #39856

This PR add lifetime bound context for compaction trigger and use it
instead of context.Background in case of rootcoord down and some grpc
call retry forever

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Feb 14, 2025
1 parent 9407a3c commit f102fa8
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
3 changes: 1 addition & 2 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ func (policy *clusteringCompactionPolicy) Enable() bool {
Params.DataCoordCfg.ClusteringCompactionAutoEnable.GetAsBool()
}

func (policy *clusteringCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
func (policy *clusteringCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) {
log.Info("start trigger clusteringCompactionPolicy...")
ctx := context.Background()
collections := policy.meta.GetCollections()

events := make(map[CompactionTriggerType][]CompactionView, 0)
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_policy_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *ClusteringCompactionPolicySuite) TestEnable() {

func (s *ClusteringCompactionPolicySuite) TestTriggerWithNoCollecitons() {
// trigger with no collections
events, err := s.clusteringCompactionPolicy.Trigger()
events, err := s.clusteringCompactionPolicy.Trigger(context.Background())
s.NoError(err)
gotViews, ok := events[TriggerTypeClustering]
s.True(ok)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerWithCollections() {
})

// trigger
events, err := s.clusteringCompactionPolicy.Trigger()
events, err := s.clusteringCompactionPolicy.Trigger(context.Background())
s.NoError(err)
gotViews, ok := events[TriggerTypeClustering]
s.True(ok)
Expand Down
3 changes: 1 addition & 2 deletions internal/datacoord/compaction_policy_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func (policy *singleCompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

func (policy *singleCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
ctx := context.Background()
func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) {
collections := policy.meta.GetCollections()

events := make(map[CompactionTriggerType][]CompactionView, 0)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_policy_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *SingleCompactionPolicySuite) SetupTest() {
}

func (s *SingleCompactionPolicySuite) TestTrigger() {
events, err := s.singlePolicy.Trigger()
events, err := s.singlePolicy.Trigger(context.Background())
s.NoError(err)
gotViews, ok := events[TriggerTypeSingle]
s.True(ok)
Expand Down
32 changes: 17 additions & 15 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type CompactionTriggerManager struct {
clusteringPolicy *clusteringCompactionPolicy
singlePolicy *singleCompactionPolicy

closeSig chan struct{}
closeWg sync.WaitGroup
cancel context.CancelFunc
closeWg sync.WaitGroup
}

func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta) *CompactionTriggerManager {
Expand All @@ -96,7 +96,6 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com
handler: handler,
compactionHandler: compactionHandler,
meta: meta,
closeSig: make(chan struct{}),
}
m.l0Policy = newL0CompactionPolicy(meta)
m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler)
Expand All @@ -112,19 +111,25 @@ func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) {

func (m *CompactionTriggerManager) Start() {
m.closeWg.Add(1)
go m.startLoop()
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
go func() {
defer m.closeWg.Done()
m.loop(ctx)
}()
}

func (m *CompactionTriggerManager) Stop() {
close(m.closeSig)
if m.cancel != nil {
m.cancel()
}
m.closeWg.Wait()
}

func (m *CompactionTriggerManager) startLoop() {
func (m *CompactionTriggerManager) loop(ctx context.Context) {
defer logutil.LogPanic()
defer m.closeWg.Done()

log := log.Ctx(context.TODO())
log := log.Ctx(ctx)
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
Expand All @@ -134,7 +139,7 @@ func (m *CompactionTriggerManager) startLoop() {
log.Info("Compaction trigger manager start")
for {
select {
case <-m.closeSig:
case <-ctx.Done():
log.Info("Compaction trigger manager checkLoop quit")
return
case <-l0Ticker.C:
Expand All @@ -150,7 +155,6 @@ func (m *CompactionTriggerManager) startLoop() {
log.Warn("Fail to trigger L0 policy", zap.Error(err))
continue
}
ctx := context.Background()
if len(events) > 0 {
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
Expand All @@ -164,12 +168,11 @@ func (m *CompactionTriggerManager) startLoop() {
log.RatedInfo(10, "Skip trigger clustering compaction since compactionHandler is full")
continue
}
events, err := m.clusteringPolicy.Trigger()
events, err := m.clusteringPolicy.Trigger(ctx)

Check warning on line 171 in internal/datacoord/compaction_trigger_v2.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_trigger_v2.go#L171

Added line #L171 was not covered by tests
if err != nil {
log.Warn("Fail to trigger clustering policy", zap.Error(err))
continue
}
ctx := context.Background()
if len(events) > 0 {
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
Expand All @@ -183,12 +186,11 @@ func (m *CompactionTriggerManager) startLoop() {
log.RatedInfo(10, "Skip trigger single compaction since compactionHandler is full")
continue
}
events, err := m.singlePolicy.Trigger()
events, err := m.singlePolicy.Trigger(ctx)
if err != nil {
log.Warn("Fail to trigger single policy", zap.Error(err))
continue
}
ctx := context.Background()
if len(events) > 0 {
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
Expand All @@ -200,7 +202,7 @@ func (m *CompactionTriggerManager) startLoop() {

func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) {
log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID))
views, triggerID, err := m.clusteringPolicy.triggerOneCollection(context.Background(), collectionID, true)
views, triggerID, err := m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit f102fa8

Please sign in to comment.