Skip to content

Commit

Permalink
fix TestImportV2 unit case
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Feb 14, 2025
1 parent 61d4c96 commit 51dd731
Show file tree
Hide file tree
Showing 27 changed files with 558 additions and 429 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ jobs:
name: Integration Test
needs: Build
runs-on: ubuntu-latest
timeout-minutes: 120
timeout-minutes: 150
steps:
- name: Maximize build space
uses: easimon/maximize-build-space@master
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250208062437-5af22aa4b559
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down Expand Up @@ -286,7 +286,6 @@ replace (
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 h1:boN3QhAWQU9O8EYQWxN7AEYav39PuD29QzZwTiI8Ca0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc h1:DhqkZ5yhNm9YMOJjxhaptgH2sgXmOYYIGv7Bj9JBZlo=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
Expand Down Expand Up @@ -662,6 +660,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b h1:s3gdV+iYJMvo9mydBJVAEA2Uaz29eIuUnQK867U3G8I=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
148 changes: 97 additions & 51 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type TriggerManager interface {
Stop()
OnCollectionUpdate(collectionID int64)
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error)
PauseL0SegmentCompacting(collectionID int64)
ResumeL0SegmentCompacting(collectionID int64)
GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{}
GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{}
}

var _ TriggerManager = (*CompactionTriggerManager)(nil)
Expand All @@ -90,30 +90,27 @@ type CompactionTriggerManager struct {
clusteringPolicy *clusteringCompactionPolicy
singlePolicy *singleCompactionPolicy

<<<<<<< HEAD
cancel context.CancelFunc
closeWg sync.WaitGroup
=======

l0Triggering bool
l0SigLock *sync.Mutex
l0TickSig *sync.Cond

closeSig chan struct{}
closeWg sync.WaitGroup
>>>>>>> support to replicate import msg
pauseCompactionChanMap map[int64]chan struct{}
resumeCompactionChanMap map[int64]chan struct{}
compactionChanLock sync.Mutex
}

func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta, imeta ImportMeta) *CompactionTriggerManager {
m := &CompactionTriggerManager{
allocator: alloc,
handler: handler,
compactionHandler: compactionHandler,
meta: meta,
<<<<<<< HEAD
=======
imeta: imeta,
closeSig: make(chan struct{}),
>>>>>>> support to replicate import msg
allocator: alloc,
handler: handler,
compactionHandler: compactionHandler,
meta: meta,
imeta: imeta,
pauseCompactionChanMap: make(map[int64]chan struct{}),
resumeCompactionChanMap: make(map[int64]chan struct{}),
}
m.l0SigLock = &sync.Mutex{}
m.l0TickSig = sync.NewCond(m.l0SigLock)
Expand Down Expand Up @@ -146,23 +143,62 @@ func (m *CompactionTriggerManager) Stop() {
m.closeWg.Wait()
}

func (m *CompactionTriggerManager) PauseL0SegmentCompacting(collectionID int64) {
func (m *CompactionTriggerManager) pauseL0SegmentCompacting(jobID, collectionID int64) {
m.l0Policy.AddSkipCollection(collectionID)
m.l0SigLock.Lock()
for m.l0Triggering {
m.l0TickSig.Wait()
}
m.l0SigLock.Unlock()
m.compactionChanLock.Lock()
if ch, ok := m.pauseCompactionChanMap[jobID]; ok {
close(ch)
}
m.compactionChanLock.Unlock()
}

func (m *CompactionTriggerManager) ResumeL0SegmentCompacting(collectionID int64) {
func (m *CompactionTriggerManager) resumeL0SegmentCompacting(jobID, collectionID int64) {
m.compactionChanLock.Lock()
m.l0Policy.RemoveSkipCollection(collectionID)
if ch, ok := m.resumeCompactionChanMap[jobID]; ok {
close(ch)
delete(m.pauseCompactionChanMap, jobID)
delete(m.resumeCompactionChanMap, jobID)
}
m.compactionChanLock.Unlock()
}

func (m *CompactionTriggerManager) GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{} {
m.compactionChanLock.Lock()
defer m.compactionChanLock.Unlock()
if ch, ok := m.pauseCompactionChanMap[jobID]; ok {
return ch
}
ch := make(chan struct{})
m.pauseCompactionChanMap[jobID] = ch
go m.pauseL0SegmentCompacting(jobID, collectionID)
return ch
}

func (m *CompactionTriggerManager) GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{} {
m.compactionChanLock.Lock()
defer m.compactionChanLock.Unlock()
if ch, ok := m.resumeCompactionChanMap[jobID]; ok {
return ch
}
ch := make(chan struct{})
m.resumeCompactionChanMap[jobID] = ch
go m.resumeL0SegmentCompacting(jobID, collectionID)
return ch
}

func (m *CompactionTriggerManager) setL0Triggering(b bool) {
m.l0SigLock.Lock()
defer m.l0SigLock.Unlock()
m.l0Triggering = b
if !b {
m.l0TickSig.Broadcast()
}
}

func (m *CompactionTriggerManager) loop(ctx context.Context) {
Expand Down Expand Up @@ -193,6 +229,7 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) {
events, err := m.l0Policy.Trigger()
if err != nil {
log.Warn("Fail to trigger L0 policy", zap.Error(err))
m.setL0Triggering(false)
continue
}
if len(events) > 0 {
Expand Down Expand Up @@ -303,6 +340,45 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
return
}

err = m.addL0ImportTaskForImport(ctx, collection, view)
if err != nil {
log.Warn("Failed to submit compaction view to scheduler because add l0 import task fail", zap.Error(err))
return
}

task := &datapb.CompactionTask{
TriggerID: taskID, // inner trigger, use task id as trigger id
PlanID: taskID,
Type: datapb.CompactionType_Level0DeleteCompaction,
StartTime: time.Now().Unix(),
InputSegments: levelZeroSegs,
State: datapb.CompactionTaskState_pipelining,
Channel: view.GetGroupLabel().Channel,
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Schema: collection.Schema,
}

err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
log.Warn("Failed to execute compaction task",
zap.Int64("triggerID", task.GetTriggerID()),
zap.Int64("planID", task.GetPlanID()),
zap.Int64s("segmentIDs", task.GetInputSegments()),
zap.Error(err))
return
}
log.Info("Finish to submit a LevelZeroCompaction plan",
zap.Int64("triggerID", task.GetTriggerID()),
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.Int64s("L0 segments", levelZeroSegs),
)
}

func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error {
// add l0 import task for the collection if the collection is importing
importJobs := m.imeta.GetJobBy(ctx, WithCollectionID(collection.ID), WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed))
if len(importJobs) > 0 {
Expand Down Expand Up @@ -356,49 +432,19 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
}, job, m.allocator, m.meta)
if err != nil {
log.Warn("new import tasks failed", zap.Error(err))
return
return err
}
for _, t := range newTasks {
err = m.imeta.AddTask(ctx, t)
if err != nil {
log.Warn("add new l0 import task from l0 compaction failed", WrapTaskLog(t, zap.Error(err))...)
return
return err
}
log.Info("add new l0 import task from l0 compaction", WrapTaskLog(t)...)
}
}
}

task := &datapb.CompactionTask{
TriggerID: taskID, // inner trigger, use task id as trigger id
PlanID: taskID,
Type: datapb.CompactionType_Level0DeleteCompaction,
StartTime: time.Now().Unix(),
InputSegments: levelZeroSegs,
State: datapb.CompactionTaskState_pipelining,
Channel: view.GetGroupLabel().Channel,
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Schema: collection.Schema,
}

err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
log.Warn("Failed to execute compaction task",
zap.Int64("triggerID", task.GetTriggerID()),
zap.Int64("planID", task.GetPlanID()),
zap.Int64s("segmentIDs", task.GetInputSegments()),
zap.Error(err))
return
}
log.Info("Finish to submit a LevelZeroCompaction plan",
zap.Int64("triggerID", task.GetTriggerID()),
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.Int64s("L0 segments", levelZeroSegs),
)
return nil
}

func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) {
Expand Down
6 changes: 4 additions & 2 deletions internal/datacoord/compaction_trigger_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ func TestCompactionAndImport(t *testing.T) {
}).Return(nil)
mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe()

triggerManager.PauseL0SegmentCompacting(100)
defer triggerManager.ResumeL0SegmentCompacting(100)
<-triggerManager.GetPauseCompactionChan(100, 10)
defer func() {
<-triggerManager.GetResumeCompactionChan(100, 10)
}()

triggerManager.Start()
defer triggerManager.Stop()
Expand Down
68 changes: 45 additions & 23 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *importChecker) Start() {
log.Info("waiting for all channels to send signals",
zap.Strings("vchannels", job.GetVchannels()),
zap.Strings("readyVchannels", job.GetReadyVchannels()),
zap.Int64("jobID", job.GetJobID())) // TODO fubang
zap.Int64("jobID", job.GetJobID()))
continue
}
switch job.GetState() {
Expand Down Expand Up @@ -215,7 +215,6 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForPreImports(job)
if len(lacks) == 0 {
log.Info("import job start to preimport") // TODO fubang
return
}
fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt())
Expand Down Expand Up @@ -393,21 +392,54 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
return
}
buildIndexDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds()))
log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration))

// wait l0 segment import and block l0 compaction
c.l0CompactionTrigger.PauseL0SegmentCompacting(job.GetCollectionID())
defer c.l0CompactionTrigger.ResumeL0SegmentCompacting(job.GetCollectionID())
log.Info("start to pause l0 segment compacting", zap.Int64("jobID", job.GetJobID()))
<-c.l0CompactionTrigger.GetPauseCompactionChan(job.GetJobID(), job.GetCollectionID())
log.Info("l0 segment compacting paused", zap.Int64("jobID", job.GetJobID()))

if c.waitL0ImortTaskDone(job) {
return
}
waitL0ImportDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageWaitL0Import).Observe(float64(buildIndexDuration.Milliseconds()))
log.Info("import job l0 import done", zap.Duration("jobTimeCost/l0Import", waitL0ImportDuration))

if c.updateSegmentState(job, originSegmentIDs, statsSegmentIDs) {
return
}
// all finished, update import job state to `Completed`.
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
log.Warn("failed to update job state to Completed", zap.Error(err))
return
}
totalDuration := job.GetTR().ElapseSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds()))
<-c.l0CompactionTrigger.GetResumeCompactionChan(job.GetJobID(), job.GetCollectionID())
log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration))
}

func (c *importChecker) waitL0ImortTaskDone(job ImportJob) bool {
// wait all lo import tasks to be completed
l0ImportTasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource())
for _, t := range l0ImportTasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
return
log.Info("waiting for l0 import task...",
zap.Int64s("taskIDs", lo.Map(l0ImportTasks, func(t ImportTask, _ int) int64 {
return t.GetTaskID()
})))
return true
}
}
return false
}

buildIndexDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds()))
log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration))

func (c *importChecker) updateSegmentState(job ImportJob, originSegmentIDs, statsSegmentIDs []int64) bool {
// Here, all segment indexes have been successfully built, try unset isImporting flag for all segments.
isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(context.TODO(), segmentID)
Expand All @@ -420,34 +452,24 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
channels, err := c.meta.GetSegmentsChannels(isImportingSegments)
if err != nil {
log.Warn("get segments channels failed", zap.Error(err))
return
return true
}
for _, segmentID := range isImportingSegments {
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
if channelCP == nil {
log.Warn("nil channel checkpoint")
return
return true
}
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
op2 := UpdateDmlPosition(segmentID, channelCP)
op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(context.TODO(), op1, op2, op3)
if err != nil {
log.Warn("update import segment failed", zap.Error(err))
return
return true
}
}

// all finished, update import job state to `Completed`.
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
log.Warn("failed to update job state to Completed", zap.Error(err))
return
}
totalDuration := job.GetTR().ElapseSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds()))
log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration))
return false
}

func (c *importChecker) checkFailedJob(job ImportJob) {
Expand Down
Loading

0 comments on commit 51dd731

Please sign in to comment.