From 51dd7313853650645e04a10c7bf859d9b5446544 Mon Sep 17 00:00:00 2001 From: SimFG Date: Thu, 23 Jan 2025 16:11:20 +0800 Subject: [PATCH] fix TestImportV2 unit case Signed-off-by: SimFG --- .github/workflows/main.yaml | 2 +- go.mod | 3 +- go.sum | 4 +- internal/datacoord/compaction_trigger_v2.go | 148 +++--- .../datacoord/compaction_trigger_v2_test.go | 6 +- internal/datacoord/import_checker.go | 68 ++- internal/datacoord/import_checker_test.go | 20 +- internal/datacoord/import_util_test.go | 14 +- internal/datacoord/mock_trigger_manager.go | 164 ++++--- internal/datacoord/server.go | 16 +- internal/datacoord/services.go | 30 +- internal/datacoord/services_test.go | 19 +- .../msghandlerimpl/msg_handler_impl_test.go | 21 +- .../pipeline/flow_graph_dd_node.go | 3 + internal/proxy/impl_test.go | 6 + internal/proxy/task_import.go | 11 +- pkg/go.mod | 4 +- pkg/go.sum | 4 +- pkg/metrics/metrics.go | 11 +- pkg/proto/data_coord.proto | 1 - pkg/proto/datapb/data_coord.pb.go | 2 +- pkg/streaming/util/message/message_type.go | 1 + scripts/run_intergration_test.sh | 2 + .../compaction/l0_compaction_test.go | 421 +++++++++--------- .../integration/import/dynamic_field_test.go | 2 +- tests/integration/import/import_test.go | 2 +- .../integration/import/partition_key_test.go | 2 +- 27 files changed, 558 insertions(+), 429 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 1c4752d4d6312..b9f5e7934f1ca 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -221,7 +221,7 @@ jobs: name: Integration Test needs: Build runs-on: ubuntu-latest - timeout-minutes: 120 + timeout-minutes: 150 steps: - name: Maximize build space uses: easimon/maximize-build-space@master diff --git a/go.mod b/go.mod index 2d6cf3c5f19ac..762afb11fd7a2 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250208062437-5af22aa4b559 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 @@ -286,7 +286,6 @@ replace ( github.com/go-kit/kit => github.com/go-kit/kit v0.1.0 github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 - github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc github.com/milvus-io/milvus/pkg => ./pkg github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1 github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect diff --git a/go.sum b/go.sum index f01a9cbcefaef..2816f3b51a725 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 h1:boN3QhAWQU9O8EYQWxN7AEYav39PuD29QzZwTiI8Ca0= github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= -github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc h1:DhqkZ5yhNm9YMOJjxhaptgH2sgXmOYYIGv7Bj9JBZlo= -github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA= github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= @@ -662,6 +660,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b h1:s3gdV+iYJMvo9mydBJVAEA2Uaz29eIuUnQK867U3G8I= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 9bf8a18bcd005..4ebf3cbc7616f 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -63,8 +63,8 @@ type TriggerManager interface { Stop() OnCollectionUpdate(collectionID int64) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) - PauseL0SegmentCompacting(collectionID int64) - ResumeL0SegmentCompacting(collectionID int64) + GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{} + GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{} } var _ TriggerManager = (*CompactionTriggerManager)(nil) @@ -90,30 +90,27 @@ type CompactionTriggerManager struct { clusteringPolicy *clusteringCompactionPolicy singlePolicy *singleCompactionPolicy -<<<<<<< HEAD cancel context.CancelFunc closeWg sync.WaitGroup -======= + l0Triggering bool l0SigLock *sync.Mutex l0TickSig *sync.Cond - closeSig chan struct{} - closeWg sync.WaitGroup ->>>>>>> support to replicate import msg + pauseCompactionChanMap map[int64]chan struct{} + resumeCompactionChanMap map[int64]chan struct{} + compactionChanLock sync.Mutex } func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta, imeta ImportMeta) *CompactionTriggerManager { m := &CompactionTriggerManager{ - allocator: alloc, - handler: handler, - compactionHandler: compactionHandler, - meta: meta, -<<<<<<< HEAD -======= - imeta: imeta, - closeSig: make(chan struct{}), ->>>>>>> support to replicate import msg + allocator: alloc, + handler: handler, + compactionHandler: compactionHandler, + meta: meta, + imeta: imeta, + pauseCompactionChanMap: make(map[int64]chan struct{}), + resumeCompactionChanMap: make(map[int64]chan struct{}), } m.l0SigLock = &sync.Mutex{} m.l0TickSig = sync.NewCond(m.l0SigLock) @@ -146,23 +143,62 @@ func (m *CompactionTriggerManager) Stop() { m.closeWg.Wait() } -func (m *CompactionTriggerManager) PauseL0SegmentCompacting(collectionID int64) { +func (m *CompactionTriggerManager) pauseL0SegmentCompacting(jobID, collectionID int64) { m.l0Policy.AddSkipCollection(collectionID) m.l0SigLock.Lock() for m.l0Triggering { m.l0TickSig.Wait() } m.l0SigLock.Unlock() + m.compactionChanLock.Lock() + if ch, ok := m.pauseCompactionChanMap[jobID]; ok { + close(ch) + } + m.compactionChanLock.Unlock() } -func (m *CompactionTriggerManager) ResumeL0SegmentCompacting(collectionID int64) { +func (m *CompactionTriggerManager) resumeL0SegmentCompacting(jobID, collectionID int64) { + m.compactionChanLock.Lock() m.l0Policy.RemoveSkipCollection(collectionID) + if ch, ok := m.resumeCompactionChanMap[jobID]; ok { + close(ch) + delete(m.pauseCompactionChanMap, jobID) + delete(m.resumeCompactionChanMap, jobID) + } + m.compactionChanLock.Unlock() +} + +func (m *CompactionTriggerManager) GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{} { + m.compactionChanLock.Lock() + defer m.compactionChanLock.Unlock() + if ch, ok := m.pauseCompactionChanMap[jobID]; ok { + return ch + } + ch := make(chan struct{}) + m.pauseCompactionChanMap[jobID] = ch + go m.pauseL0SegmentCompacting(jobID, collectionID) + return ch +} + +func (m *CompactionTriggerManager) GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{} { + m.compactionChanLock.Lock() + defer m.compactionChanLock.Unlock() + if ch, ok := m.resumeCompactionChanMap[jobID]; ok { + return ch + } + ch := make(chan struct{}) + m.resumeCompactionChanMap[jobID] = ch + go m.resumeL0SegmentCompacting(jobID, collectionID) + return ch } func (m *CompactionTriggerManager) setL0Triggering(b bool) { m.l0SigLock.Lock() defer m.l0SigLock.Unlock() m.l0Triggering = b + if !b { + m.l0TickSig.Broadcast() + } } func (m *CompactionTriggerManager) loop(ctx context.Context) { @@ -193,6 +229,7 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) { events, err := m.l0Policy.Trigger() if err != nil { log.Warn("Fail to trigger L0 policy", zap.Error(err)) + m.setL0Triggering(false) continue } if len(events) > 0 { @@ -303,6 +340,45 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, return } + err = m.addL0ImportTaskForImport(ctx, collection, view) + if err != nil { + log.Warn("Failed to submit compaction view to scheduler because add l0 import task fail", zap.Error(err)) + return + } + + task := &datapb.CompactionTask{ + TriggerID: taskID, // inner trigger, use task id as trigger id + PlanID: taskID, + Type: datapb.CompactionType_Level0DeleteCompaction, + StartTime: time.Now().Unix(), + InputSegments: levelZeroSegs, + State: datapb.CompactionTaskState_pipelining, + Channel: view.GetGroupLabel().Channel, + CollectionID: view.GetGroupLabel().CollectionID, + PartitionID: view.GetGroupLabel().PartitionID, + Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, + TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), + Schema: collection.Schema, + } + + err = m.compactionHandler.enqueueCompaction(task) + if err != nil { + log.Warn("Failed to execute compaction task", + zap.Int64("triggerID", task.GetTriggerID()), + zap.Int64("planID", task.GetPlanID()), + zap.Int64s("segmentIDs", task.GetInputSegments()), + zap.Error(err)) + return + } + log.Info("Finish to submit a LevelZeroCompaction plan", + zap.Int64("triggerID", task.GetTriggerID()), + zap.Int64("planID", task.GetPlanID()), + zap.String("type", task.GetType().String()), + zap.Int64s("L0 segments", levelZeroSegs), + ) +} + +func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error { // add l0 import task for the collection if the collection is importing importJobs := m.imeta.GetJobBy(ctx, WithCollectionID(collection.ID), WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed)) if len(importJobs) > 0 { @@ -356,49 +432,19 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, }, job, m.allocator, m.meta) if err != nil { log.Warn("new import tasks failed", zap.Error(err)) - return + return err } for _, t := range newTasks { err = m.imeta.AddTask(ctx, t) if err != nil { log.Warn("add new l0 import task from l0 compaction failed", WrapTaskLog(t, zap.Error(err))...) - return + return err } log.Info("add new l0 import task from l0 compaction", WrapTaskLog(t)...) } } } - - task := &datapb.CompactionTask{ - TriggerID: taskID, // inner trigger, use task id as trigger id - PlanID: taskID, - Type: datapb.CompactionType_Level0DeleteCompaction, - StartTime: time.Now().Unix(), - InputSegments: levelZeroSegs, - State: datapb.CompactionTaskState_pipelining, - Channel: view.GetGroupLabel().Channel, - CollectionID: view.GetGroupLabel().CollectionID, - PartitionID: view.GetGroupLabel().PartitionID, - Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, - TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), - Schema: collection.Schema, - } - - err = m.compactionHandler.enqueueCompaction(task) - if err != nil { - log.Warn("Failed to execute compaction task", - zap.Int64("triggerID", task.GetTriggerID()), - zap.Int64("planID", task.GetPlanID()), - zap.Int64s("segmentIDs", task.GetInputSegments()), - zap.Error(err)) - return - } - log.Info("Finish to submit a LevelZeroCompaction plan", - zap.Int64("triggerID", task.GetTriggerID()), - zap.Int64("planID", task.GetPlanID()), - zap.String("type", task.GetType().String()), - zap.Int64s("L0 segments", levelZeroSegs), - ) + return nil } func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) { diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index c55460daee9ca..18ff9c7143ed6 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -380,8 +380,10 @@ func TestCompactionAndImport(t *testing.T) { }).Return(nil) mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() - triggerManager.PauseL0SegmentCompacting(100) - defer triggerManager.ResumeL0SegmentCompacting(100) + <-triggerManager.GetPauseCompactionChan(100, 10) + defer func() { + <-triggerManager.GetResumeCompactionChan(100, 10) + }() triggerManager.Start() defer triggerManager.Stop() diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 5701dc44ae1d2..7909151205be1 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -96,7 +96,7 @@ func (c *importChecker) Start() { log.Info("waiting for all channels to send signals", zap.Strings("vchannels", job.GetVchannels()), zap.Strings("readyVchannels", job.GetReadyVchannels()), - zap.Int64("jobID", job.GetJobID())) // TODO fubang + zap.Int64("jobID", job.GetJobID())) continue } switch job.GetState() { @@ -215,7 +215,6 @@ func (c *importChecker) checkPendingJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) lacks := c.getLackFilesForPreImports(job) if len(lacks) == 0 { - log.Info("import job start to preimport") // TODO fubang return } fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt()) @@ -393,21 +392,54 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) return } + buildIndexDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds())) + log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration)) // wait l0 segment import and block l0 compaction - c.l0CompactionTrigger.PauseL0SegmentCompacting(job.GetCollectionID()) - defer c.l0CompactionTrigger.ResumeL0SegmentCompacting(job.GetCollectionID()) + log.Info("start to pause l0 segment compacting", zap.Int64("jobID", job.GetJobID())) + <-c.l0CompactionTrigger.GetPauseCompactionChan(job.GetJobID(), job.GetCollectionID()) + log.Info("l0 segment compacting paused", zap.Int64("jobID", job.GetJobID())) + + if c.waitL0ImortTaskDone(job) { + return + } + waitL0ImportDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageWaitL0Import).Observe(float64(buildIndexDuration.Milliseconds())) + log.Info("import job l0 import done", zap.Duration("jobTimeCost/l0Import", waitL0ImportDuration)) + + if c.updateSegmentState(job, originSegmentIDs, statsSegmentIDs) { + return + } + // all finished, update import job state to `Completed`. + completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") + err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) + if err != nil { + log.Warn("failed to update job state to Completed", zap.Error(err)) + return + } + totalDuration := job.GetTR().ElapseSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds())) + <-c.l0CompactionTrigger.GetResumeCompactionChan(job.GetJobID(), job.GetCollectionID()) + log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration)) +} + +func (c *importChecker) waitL0ImortTaskDone(job ImportJob) bool { + // wait all lo import tasks to be completed l0ImportTasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource()) for _, t := range l0ImportTasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { - return + log.Info("waiting for l0 import task...", + zap.Int64s("taskIDs", lo.Map(l0ImportTasks, func(t ImportTask, _ int) int64 { + return t.GetTaskID() + }))) + return true } } + return false +} - buildIndexDuration := job.GetTR().RecordSpan() - metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds())) - log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration)) - +func (c *importChecker) updateSegmentState(job ImportJob, originSegmentIDs, statsSegmentIDs []int64) bool { // Here, all segment indexes have been successfully built, try unset isImporting flag for all segments. isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool { segment := c.meta.GetSegment(context.TODO(), segmentID) @@ -420,13 +452,13 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { channels, err := c.meta.GetSegmentsChannels(isImportingSegments) if err != nil { log.Warn("get segments channels failed", zap.Error(err)) - return + return true } for _, segmentID := range isImportingSegments { channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { log.Warn("nil channel checkpoint") - return + return true } op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) op2 := UpdateDmlPosition(segmentID, channelCP) @@ -434,20 +466,10 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { err = c.meta.UpdateSegmentsInfo(context.TODO(), op1, op2, op3) if err != nil { log.Warn("update import segment failed", zap.Error(err)) - return + return true } } - - // all finished, update import job state to `Completed`. - completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") - err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) - if err != nil { - log.Warn("failed to update job state to Completed", zap.Error(err)) - return - } - totalDuration := job.GetTR().ElapseSpan() - metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds())) - log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration)) + return false } func (c *importChecker) checkFailedJob(job ImportJob) { diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index d3d00d793031b..0f33718979efb 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/indexpb" "github.com/milvus-io/milvus/pkg/proto/internalpb" + "github.com/milvus-io/milvus/pkg/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -79,8 +80,10 @@ func (s *ImportCheckerSuite) SetupTest() { sjm := NewMockStatsJobManager(s.T()) l0CompactionTrigger := NewMockTriggerManager(s.T()) - l0CompactionTrigger.EXPECT().PauseL0SegmentCompacting(mock.Anything).Return().Maybe() - l0CompactionTrigger.EXPECT().ResumeL0SegmentCompacting(mock.Anything).Return().Maybe() + compactionChan := make(chan struct{}, 1) + close(compactionChan) + l0CompactionTrigger.EXPECT().GetPauseCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() + l0CompactionTrigger.EXPECT().GetResumeCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() checker := NewImportChecker(meta, broker, cluster, s.alloc, imeta, sjm, l0CompactionTrigger).(*importChecker) s.checker = checker @@ -528,7 +531,6 @@ func TestImportCheckerCompaction(t *testing.T) { catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil) catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -543,15 +545,15 @@ func TestImportCheckerCompaction(t *testing.T) { imeta, err := NewImportMeta(context.TODO(), catalog) assert.NoError(t, err) - meta, err := newMeta(context.TODO(), catalog, nil) - assert.NoError(t, err) - broker := broker2.NewMockBroker(t) - + broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) sjm := NewMockStatsJobManager(t) l0CompactionTrigger := NewMockTriggerManager(t) - l0CompactionTrigger.EXPECT().PauseL0SegmentCompacting(mock.Anything).Return().Maybe() - l0CompactionTrigger.EXPECT().ResumeL0SegmentCompacting(mock.Anything).Return().Maybe() + compactionChan := make(chan struct{}, 1) + close(compactionChan) + l0CompactionTrigger.EXPECT().GetPauseCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() + l0CompactionTrigger.EXPECT().GetResumeCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() checker := NewImportChecker(meta, broker, cluster, alloc, imeta, sjm, l0CompactionTrigger).(*importChecker) diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 5ae82e8f9f983..73ea2dc7f8bdd 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" + broker2 "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/metastore/mocks" mocks2 "github.com/milvus-io/milvus/internal/mocks" @@ -41,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/indexpb" "github.com/milvus-io/milvus/pkg/proto/internalpb" + "github.com/milvus-io/milvus/pkg/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -165,17 +167,18 @@ func TestImportUtil_NewImportTasksWithDataTt(t *testing.T) { alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil) catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) - catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker2.NewMockBroker(t) + broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) tasks, err := NewImportTasks(fileGroups, job, alloc, meta) @@ -285,7 +288,6 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) { } catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -301,7 +303,9 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) { return id, id + n, nil }) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker2.NewMockBroker(t) + broker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{}, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) segment := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ID: 5, IsImporting: true}, diff --git a/internal/datacoord/mock_trigger_manager.go b/internal/datacoord/mock_trigger_manager.go index 15c10a8430d40..bf5d80c4f9c59 100644 --- a/internal/datacoord/mock_trigger_manager.go +++ b/internal/datacoord/mock_trigger_manager.go @@ -21,6 +21,104 @@ func (_m *MockTriggerManager) EXPECT() *MockTriggerManager_Expecter { return &MockTriggerManager_Expecter{mock: &_m.Mock} } +// GetPauseCompactionChan provides a mock function with given fields: jobID, collectionID +func (_m *MockTriggerManager) GetPauseCompactionChan(jobID int64, collectionID int64) <-chan struct{} { + ret := _m.Called(jobID, collectionID) + + if len(ret) == 0 { + panic("no return value specified for GetPauseCompactionChan") + } + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func(int64, int64) <-chan struct{}); ok { + r0 = rf(jobID, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockTriggerManager_GetPauseCompactionChan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPauseCompactionChan' +type MockTriggerManager_GetPauseCompactionChan_Call struct { + *mock.Call +} + +// GetPauseCompactionChan is a helper method to define mock.On call +// - jobID int64 +// - collectionID int64 +func (_e *MockTriggerManager_Expecter) GetPauseCompactionChan(jobID interface{}, collectionID interface{}) *MockTriggerManager_GetPauseCompactionChan_Call { + return &MockTriggerManager_GetPauseCompactionChan_Call{Call: _e.mock.On("GetPauseCompactionChan", jobID, collectionID)} +} + +func (_c *MockTriggerManager_GetPauseCompactionChan_Call) Run(run func(jobID int64, collectionID int64)) *MockTriggerManager_GetPauseCompactionChan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64)) + }) + return _c +} + +func (_c *MockTriggerManager_GetPauseCompactionChan_Call) Return(_a0 <-chan struct{}) *MockTriggerManager_GetPauseCompactionChan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTriggerManager_GetPauseCompactionChan_Call) RunAndReturn(run func(int64, int64) <-chan struct{}) *MockTriggerManager_GetPauseCompactionChan_Call { + _c.Call.Return(run) + return _c +} + +// GetResumeCompactionChan provides a mock function with given fields: jobID, collectionID +func (_m *MockTriggerManager) GetResumeCompactionChan(jobID int64, collectionID int64) <-chan struct{} { + ret := _m.Called(jobID, collectionID) + + if len(ret) == 0 { + panic("no return value specified for GetResumeCompactionChan") + } + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func(int64, int64) <-chan struct{}); ok { + r0 = rf(jobID, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockTriggerManager_GetResumeCompactionChan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetResumeCompactionChan' +type MockTriggerManager_GetResumeCompactionChan_Call struct { + *mock.Call +} + +// GetResumeCompactionChan is a helper method to define mock.On call +// - jobID int64 +// - collectionID int64 +func (_e *MockTriggerManager_Expecter) GetResumeCompactionChan(jobID interface{}, collectionID interface{}) *MockTriggerManager_GetResumeCompactionChan_Call { + return &MockTriggerManager_GetResumeCompactionChan_Call{Call: _e.mock.On("GetResumeCompactionChan", jobID, collectionID)} +} + +func (_c *MockTriggerManager_GetResumeCompactionChan_Call) Run(run func(jobID int64, collectionID int64)) *MockTriggerManager_GetResumeCompactionChan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64)) + }) + return _c +} + +func (_c *MockTriggerManager_GetResumeCompactionChan_Call) Return(_a0 <-chan struct{}) *MockTriggerManager_GetResumeCompactionChan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTriggerManager_GetResumeCompactionChan_Call) RunAndReturn(run func(int64, int64) <-chan struct{}) *MockTriggerManager_GetResumeCompactionChan_Call { + _c.Call.Return(run) + return _c +} + // ManualTrigger provides a mock function with given fields: ctx, collectionID, clusteringCompaction func (_m *MockTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (int64, error) { ret := _m.Called(ctx, collectionID, clusteringCompaction) @@ -112,72 +210,6 @@ func (_c *MockTriggerManager_OnCollectionUpdate_Call) RunAndReturn(run func(int6 return _c } -// PauseL0SegmentCompacting provides a mock function with given fields: collectionID -func (_m *MockTriggerManager) PauseL0SegmentCompacting(collectionID int64) { - _m.Called(collectionID) -} - -// MockTriggerManager_PauseL0SegmentCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PauseL0SegmentCompacting' -type MockTriggerManager_PauseL0SegmentCompacting_Call struct { - *mock.Call -} - -// PauseL0SegmentCompacting is a helper method to define mock.On call -// - collectionID int64 -func (_e *MockTriggerManager_Expecter) PauseL0SegmentCompacting(collectionID interface{}) *MockTriggerManager_PauseL0SegmentCompacting_Call { - return &MockTriggerManager_PauseL0SegmentCompacting_Call{Call: _e.mock.On("PauseL0SegmentCompacting", collectionID)} -} - -func (_c *MockTriggerManager_PauseL0SegmentCompacting_Call) Run(run func(collectionID int64)) *MockTriggerManager_PauseL0SegmentCompacting_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockTriggerManager_PauseL0SegmentCompacting_Call) Return() *MockTriggerManager_PauseL0SegmentCompacting_Call { - _c.Call.Return() - return _c -} - -func (_c *MockTriggerManager_PauseL0SegmentCompacting_Call) RunAndReturn(run func(int64)) *MockTriggerManager_PauseL0SegmentCompacting_Call { - _c.Call.Return(run) - return _c -} - -// ResumeL0SegmentCompacting provides a mock function with given fields: collectionID -func (_m *MockTriggerManager) ResumeL0SegmentCompacting(collectionID int64) { - _m.Called(collectionID) -} - -// MockTriggerManager_ResumeL0SegmentCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResumeL0SegmentCompacting' -type MockTriggerManager_ResumeL0SegmentCompacting_Call struct { - *mock.Call -} - -// ResumeL0SegmentCompacting is a helper method to define mock.On call -// - collectionID int64 -func (_e *MockTriggerManager_Expecter) ResumeL0SegmentCompacting(collectionID interface{}) *MockTriggerManager_ResumeL0SegmentCompacting_Call { - return &MockTriggerManager_ResumeL0SegmentCompacting_Call{Call: _e.mock.On("ResumeL0SegmentCompacting", collectionID)} -} - -func (_c *MockTriggerManager_ResumeL0SegmentCompacting_Call) Run(run func(collectionID int64)) *MockTriggerManager_ResumeL0SegmentCompacting_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockTriggerManager_ResumeL0SegmentCompacting_Call) Return() *MockTriggerManager_ResumeL0SegmentCompacting_Call { - _c.Call.Return() - return _c -} - -func (_c *MockTriggerManager_ResumeL0SegmentCompacting_Call) RunAndReturn(run func(int64)) *MockTriggerManager_ResumeL0SegmentCompacting_Call { - _c.Call.Return(run) - return _c -} - // Start provides a mock function with given fields: func (_m *MockTriggerManager) Start() { _m.Called() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7123401befd02..61928a0b6a3ed 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -382,6 +382,10 @@ func (s *Server) initDataCoord() error { } log.Info("init service discovery done") + s.importMeta, err = NewImportMeta(s.ctx, s.meta.catalog) + if err != nil { + return err + } s.initCompaction() log.Info("init compaction done") @@ -391,13 +395,6 @@ func (s *Server) initDataCoord() error { s.initJobManager() log.Info("init statsJobManager done") - s.importMeta, err = NewImportMeta(s.ctx, s.meta.catalog) - if err != nil { - return err - } - s.initCompaction() - log.Info("init compaction done") - if err = s.initSegmentManager(); err != nil { return err } @@ -706,15 +703,10 @@ func (s *Server) initIndexNodeManager() { } func (s *Server) initCompaction() { -<<<<<<< HEAD cph := newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.handler) cph.loadMeta() s.compactionHandler = cph - s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta) -======= - s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.taskScheduler, s.handler) s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta, s.importMeta) ->>>>>>> support to replicate import msg s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 9cc587315c22d..6b2ddb0597e86 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/componentutil" @@ -43,7 +42,6 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/internalpb" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -1725,16 +1723,20 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files)) } + // The import task does not need to be controlled for the time being, and additional development is required later. + // Here is a comment, because the current importv2 communicates through messages and needs to ensure idempotence. + // Adding this part of the logic will cause importv2 to retry infinitely until the previous import task is completed. + // Check if the number of jobs exceeds the limit. - maxNum := paramtable.Get().DataCoordCfg.MaxImportJobNum.GetAsInt() - executingNum := s.importMeta.CountJobBy(ctx, WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed)) - if executingNum >= maxNum { - resp.Status = merr.Status(merr.WrapErrImportFailed( - fmt.Sprintf("The number of jobs has reached the limit, please try again later. " + - "If your request is set to only import a single file, " + - "please consider importing multiple files in one request for better efficiency."))) - return resp, nil - } + // maxNum := paramtable.Get().DataCoordCfg.MaxImportJobNum.GetAsInt() + // executingNum := s.importMeta.CountJobBy(ctx, WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed)) + // if executingNum >= maxNum { + // resp.Status = merr.Status(merr.WrapErrImportFailed( + // fmt.Sprintf("The number of jobs has reached the limit, please try again later. " + + // "If your request is set to only import a single file, " + + // "please consider importing multiple files in one request for better efficiency."))) + // return resp, nil + // } // Allocate file ids. idStart, _, err := s.allocator.AllocN(int64(len(files)) + 1) @@ -1812,12 +1814,6 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport return resp, nil } - // Import is asynchronous consumed from the wal, so we need to wait for the wal to release the resource key. - // The job can be seen by the user after the resource key is acked once at any vchannel. - if err := streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewImportJobIDResourceKey(jobID)); err != nil { - return nil, err - } - job := s.importMeta.GetJob(ctx, jobID) if job == nil { resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID))) diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 51e9bd18caaf7..f3ebb030f47bb 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -22,9 +22,11 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" mocks2 "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -1411,11 +1413,11 @@ func TestImportV2(t *testing.T) { assert.Equal(t, 1, len(jobs)) // number of jobs reached the limit - Params.Save(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key, "1") - resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{}) - assert.NoError(t, err) - assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed)) - Params.Reset(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key) + // Params.Save(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key, "1") + // resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{}) + // assert.NoError(t, err) + // assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed)) + // Params.Reset(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key) }) t.Run("GetImportProgress", func(t *testing.T) { @@ -1440,6 +1442,13 @@ func TestImportV2(t *testing.T) { catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) + wal := mock_streaming.NewMockWALAccesser(t) + b := mock_streaming.NewMockBroadcast(t) + wal.EXPECT().Broadcast().Return(b).Maybe() + b.EXPECT().BlockUntilResourceKeyAckOnce(mock.Anything, mock.Anything).Return(nil).Maybe() + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + s.importMeta, err = NewImportMeta(context.TODO(), catalog) assert.NoError(t, err) resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{ diff --git a/internal/datanode/msghandlerimpl/msg_handler_impl_test.go b/internal/datanode/msghandlerimpl/msg_handler_impl_test.go index 59c9817270456..5b7367b4d47b3 100644 --- a/internal/datanode/msghandlerimpl/msg_handler_impl_test.go +++ b/internal/datanode/msghandlerimpl/msg_handler_impl_test.go @@ -19,15 +19,21 @@ package msghandlerimpl import ( + "context" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/flushcommon/broker" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestMsgHandlerImpl(t *testing.T) { + paramtable.Init() + ctx := context.Background() b := broker.NewMockBroker(t) m := NewMsgHandlerImpl(b) assert.Panics(t, func() { @@ -39,14 +45,17 @@ func TestMsgHandlerImpl(t *testing.T) { assert.Panics(t, func() { m.HandleManualFlush("", nil) }) - t.Run("HandleImport error", func(t *testing.T) { - b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() - err := m.HandleImport(nil, "", nil) - assert.Error(t, err) - }) t.Run("HandleImport success", func(t *testing.T) { + wal := mock_streaming.NewMockWALAccesser(t) + bo := mock_streaming.NewMockBroadcast(t) + wal.EXPECT().Broadcast().Return(bo) + bo.EXPECT().Ack(mock.Anything, mock.Anything).Return(nil) + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + + b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() b.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(nil, nil).Once() - err := m.HandleImport(nil, "", nil) + err := m.HandleImport(ctx, "", nil) assert.NoError(t, err) }) } diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node.go b/internal/flushcommon/pipeline/flow_graph_dd_node.go index 3ff6824d51865..f9cd966868ae4 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node.go @@ -278,6 +278,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } case commonpb.MsgType_Import: importMsg := msg.(*msgstream.ImportMsg) + if importMsg.GetCollectionID() != ddn.collectionID { + continue + } logger := log.With( zap.String("vchannel", ddn.Name()), zap.Int32("msgType", int32(msg.Type())), diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index fe77994b2b65b..10d3e3f529148 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1614,6 +1614,12 @@ func TestProxy_ReplicateMessage(t *testing.T) { } func TestProxy_ImportV2(t *testing.T) { + wal := mock_streaming.NewMockWALAccesser(t) + b := mock_streaming.NewMockBroadcast(t) + wal.EXPECT().Broadcast().Return(b).Maybe() + b.EXPECT().BlockUntilResourceKeyAckOnce(mock.Anything, mock.Anything).Return(nil).Maybe() + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() ctx := context.Background() mockErr := errors.New("mock error") diff --git a/internal/proxy/task_import.go b/internal/proxy/task_import.go index f6b4470ab2f47..bd6e9c3669b02 100644 --- a/internal/proxy/task_import.go +++ b/internal/proxy/task_import.go @@ -216,12 +216,12 @@ func (it *importTask) getChannels() []pChan { } func (it *importTask) Execute(ctx context.Context) error { - // TODO fubang should send mq msg jobID, err := it.node.rowIDAllocator.AllocOne() if err != nil { log.Ctx(ctx).Warn("alloc job id failed", zap.Error(err)) return err } + resourceKey := message.NewImportJobIDResourceKey(jobID) msg, err := message.NewImportMessageBuilderV1(). WithHeader(&message.ImportMessageHeader{}).WithBody( &msgpb.ImportMsg{ @@ -238,7 +238,7 @@ func (it *importTask) Execute(ctx context.Context) error { Schema: it.schema.CollectionSchema, JobID: jobID, }). - WithBroadcast(it.vchannels, message.NewImportJobIDResourceKey(jobID)). + WithBroadcast(it.vchannels, resourceKey). BuildBroadcast() if err != nil { log.Ctx(ctx).Warn("create import message failed", zap.Error(err)) @@ -249,13 +249,18 @@ func (it *importTask) Execute(ctx context.Context) error { log.Ctx(ctx).Warn("broadcast import msg failed", zap.Error(err)) return err } - log.Ctx(ctx).Debug( + log.Ctx(ctx).Info( "broadcast import msg success", zap.Int64("jobID", jobID), zap.Uint64("broadcastID", resp.BroadcastID), zap.Any("appendResults", resp.AppendResults), ) it.resp.JobID = strconv.FormatInt(jobID, 10) + // Import is asynchronous consumed from the wal, so we need to wait for the wal to release the resource key. + // The job can be seen by the user after the resource key is acked once at any vchannel. + if err := streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, resourceKey); err != nil { + return err + } return nil } diff --git a/pkg/go.mod b/pkg/go.mod index 13faafe2d1c58..56053d466ab6b 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -15,7 +15,7 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.7 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250208062437-5af22aa4b559 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 @@ -181,10 +181,8 @@ replace ( github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 github.com/go-kit/kit => github.com/go-kit/kit v0.1.0 github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 - github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1 github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect - ) exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b diff --git a/pkg/go.sum b/pkg/go.sum index 66ca03826ed7f..1f4ce2d84eab8 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -57,8 +57,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 h1:boN3QhAWQU9O8EYQWxN7AEYav39PuD29QzZwTiI8Ca0= github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= -github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc h1:DhqkZ5yhNm9YMOJjxhaptgH2sgXmOYYIGv7Bj9JBZlo= -github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250123022249-856e8ca907bc/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA= github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= @@ -494,6 +492,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b h1:s3gdV+iYJMvo9mydBJVAEA2Uaz29eIuUnQK867U3G8I= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250214033407-ad08272e542b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index bf682f696defc..b6025b8bd953a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -78,11 +78,12 @@ const ( Executing = "executing" Done = "done" - ImportStagePending = "pending" - ImportStagePreImport = "preimport" - ImportStageImport = "import" - ImportStageStats = "stats" - ImportStageBuildIndex = "build_index" + ImportStagePending = "pending" + ImportStagePreImport = "preimport" + ImportStageImport = "import" + ImportStageStats = "stats" + ImportStageBuildIndex = "build_index" + ImportStageWaitL0Import = "wait_l0_import" compactionTypeLabelName = "compaction_type" isVectorFieldLabelName = "is_vector_field" diff --git a/pkg/proto/data_coord.proto b/pkg/proto/data_coord.proto index f94d70e612743..8f18ef510bde9 100644 --- a/pkg/proto/data_coord.proto +++ b/pkg/proto/data_coord.proto @@ -639,7 +639,6 @@ message CompactionPlanResult { repeated CompactionSegment segments = 3; string channel = 4; CompactionType type = 5; - // l0 import file path } message CompactionStateResponse { diff --git a/pkg/proto/datapb/data_coord.pb.go b/pkg/proto/datapb/data_coord.pb.go index 5bfe843ec3238..ad33bba7b3807 100644 --- a/pkg/proto/datapb/data_coord.pb.go +++ b/pkg/proto/datapb/data_coord.pb.go @@ -4818,7 +4818,7 @@ type CompactionPlanResult struct { State CompactionTaskState `protobuf:"varint,2,opt,name=state,proto3,enum=milvus.proto.data.CompactionTaskState" json:"state,omitempty"` Segments []*CompactionSegment `protobuf:"bytes,3,rep,name=segments,proto3" json:"segments,omitempty"` Channel string `protobuf:"bytes,4,opt,name=channel,proto3" json:"channel,omitempty"` - Type CompactionType `protobuf:"varint,5,opt,name=type,proto3,enum=milvus.proto.data.CompactionType" json:"type,omitempty"` // l0 import file path + Type CompactionType `protobuf:"varint,5,opt,name=type,proto3,enum=milvus.proto.data.CompactionType" json:"type,omitempty"` } func (x *CompactionPlanResult) Reset() { diff --git a/pkg/streaming/util/message/message_type.go b/pkg/streaming/util/message/message_type.go index 05f017e3e5e6c..ca3f21bb1ed78 100644 --- a/pkg/streaming/util/message/message_type.go +++ b/pkg/streaming/util/message/message_type.go @@ -43,6 +43,7 @@ var messageTypeName = map[MessageType]string{ MessageTypeBeginTxn: "BEGIN_TXN", MessageTypeCommitTxn: "COMMIT_TXN", MessageTypeRollbackTxn: "ROLLBACK_TXN", + MessageTypeImport: "IMPORT", } // String implements fmt.Stringer interface. diff --git a/scripts/run_intergration_test.sh b/scripts/run_intergration_test.sh index 999387e43c8d7..2c9a1ecd48a36 100755 --- a/scripts/run_intergration_test.sh +++ b/scripts/run_intergration_test.sh @@ -40,6 +40,8 @@ for d in $(go list ./tests/integration/...); do echo "running coordrecovery" # simplified command to speed up coord init test since it is large. $TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m + elif [[ $d == *"import"* ]]; then + go test -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=60m else $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m fi diff --git a/tests/integration/compaction/l0_compaction_test.go b/tests/integration/compaction/l0_compaction_test.go index 68ebb57289fa1..26f864b19325f 100644 --- a/tests/integration/compaction/l0_compaction_test.go +++ b/tests/integration/compaction/l0_compaction_test.go @@ -16,223 +16,224 @@ package compaction -// "context" -// "fmt" -// "time" - -// "github.com/samber/lo" -// "go.uber.org/zap" -// "google.golang.org/protobuf/proto" - -// "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -// "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" -// "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" -// "github.com/milvus-io/milvus/pkg/common" -// "github.com/milvus-io/milvus/pkg/log" -// "github.com/milvus-io/milvus/pkg/proto/datapb" -// "github.com/milvus-io/milvus/pkg/util/funcutil" -// "github.com/milvus-io/milvus/pkg/util/merr" -// "github.com/milvus-io/milvus/pkg/util/metric" -// "github.com/milvus-io/milvus/pkg/util/paramtable" -// "github.com/milvus-io/milvus/tests/integration" +import ( + "context" + "fmt" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) func (s *CompactionSuite) TestL0Compaction() { - // TODO fubang - // ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) - // defer cancel() - // c := s.Cluster - - // const ( - // dim = 128 - // dbName = "" - // rowNum = 100000 - // deleteCnt = 50000 - - // indexType = integration.IndexFaissIvfFlat - // metricType = metric.L2 - // vecType = schemapb.DataType_FloatVector - // ) - - // paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1") - // defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) - - // collectionName := "TestCompaction_" + funcutil.GenRandomStr() - - // schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, vecType) - // marshaledSchema, err := proto.Marshal(schema) - // s.NoError(err) - - // // create collection - // createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Schema: marshaledSchema, - // ShardsNum: common.DefaultShardsNum, - // ConsistencyLevel: commonpb.ConsistencyLevel_Strong, - // }) - // err = merr.CheckRPCCall(createCollectionStatus, err) - // s.NoError(err) - // log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - - // // show collection - // showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) - // err = merr.CheckRPCCall(showCollectionsResp, err) - // s.NoError(err) - // log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - - // // insert - // pkColumn := integration.NewInt64FieldData(integration.Int64Field, rowNum) - // fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) - // hashKeys := integration.GenerateHashKeys(rowNum) - // insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn}, - // HashKeys: hashKeys, - // NumRows: uint32(rowNum), - // }) - // err = merr.CheckRPCCall(insertResult, err) - // s.NoError(err) - // s.Equal(int64(rowNum), insertResult.GetInsertCnt()) - - // // flush - // flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - // DbName: dbName, - // CollectionNames: []string{collectionName}, - // }) - // err = merr.CheckRPCCall(flushResp, err) - // s.NoError(err) - // segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] - // ids := segmentIDs.GetData() - // s.Require().NotEmpty(segmentIDs) - // s.Require().True(has) - // flushTs, has := flushResp.GetCollFlushTs()[collectionName] - // s.True(has) - // s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) - - // // create index - // createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ - // CollectionName: collectionName, - // FieldName: integration.FloatVecField, - // IndexName: "_default", - // ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), - // }) - // err = merr.CheckRPCCall(createIndexStatus, err) - // s.NoError(err) - // s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) - - // segments, err := c.MetaWatcher.ShowSegments() - // s.NoError(err) - // s.NotEmpty(segments) - // // stats task happened - // s.Equal(2, len(segments)) - // s.Equal(int64(rowNum), segments[0].GetNumOfRows()) - - // // load - // loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // }) - // err = merr.CheckRPCCall(loadStatus, err) - // s.NoError(err) - // s.WaitForLoad(ctx, collectionName) - - // // delete - // deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Expr: fmt.Sprintf("%s < %d", integration.Int64Field, deleteCnt), - // }) - // err = merr.CheckRPCCall(deleteResult, err) - // s.NoError(err) - - // // flush l0 - // flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - // DbName: dbName, - // CollectionNames: []string{collectionName}, - // }) - // err = merr.CheckRPCCall(flushResp, err) - // s.NoError(err) - // flushTs, has = flushResp.GetCollFlushTs()[collectionName] - // s.True(has) - // s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) - - // // query - // queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Expr: "", - // OutputFields: []string{"count(*)"}, - // }) - // err = merr.CheckRPCCall(queryResult, err) - // s.NoError(err) - // s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) - - // // wait for l0 compaction completed - // showSegments := func() bool { - // segments, err = c.MetaWatcher.ShowSegments() - // s.NoError(err) - // s.NotEmpty(segments) - // log.Info("ShowSegments result", zap.Any("segments", segments)) - // flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { - // return segment.GetState() == commonpb.SegmentState_Flushed - // }) - // if len(flushed) == 1 && - // flushed[0].GetLevel() == datapb.SegmentLevel_L1 && - // flushed[0].GetNumOfRows() == rowNum { - // log.Info("l0 compaction done, wait for single compaction") - // } - // return len(flushed) == 1 && - // flushed[0].GetLevel() == datapb.SegmentLevel_L1 && - // flushed[0].GetNumOfRows() == rowNum-deleteCnt - // } - // for !showSegments() { - // select { - // case <-ctx.Done(): - // s.Fail("waiting for compaction timeout") - // return - // case <-time.After(1 * time.Second): - // } - // } - - // // search - // expr := fmt.Sprintf("%s > 0", integration.Int64Field) - // nq := 10 - // topk := 10 - // roundDecimal := -1 - // params := integration.GetSearchParams(indexType, metricType) - // searchReq := integration.ConstructSearchRequest("", collectionName, expr, - // integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) - - // searchResult, err := c.Proxy.Search(ctx, searchReq) - // err = merr.CheckRPCCall(searchResult, err) - // s.NoError(err) - // s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) - - // // query - // queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ - // DbName: dbName, - // CollectionName: collectionName, - // Expr: "", - // OutputFields: []string{"count(*)"}, - // }) - // err = merr.CheckRPCCall(queryResult, err) - // s.NoError(err) - // s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) - - // // release collection - // status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ - // CollectionName: collectionName, - // }) - // err = merr.CheckRPCCall(status, err) - // s.NoError(err) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 100000 + deleteCnt = 50000 + + indexType = integration.IndexFaissIvfFlat + metricType = metric.L2 + vecType = schemapb.DataType_FloatVector + ) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) + + collectionName := "TestCompaction_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, vecType) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + // create collection + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + ConsistencyLevel: commonpb.ConsistencyLevel_Strong, + }) + err = merr.CheckRPCCall(createCollectionStatus, err) + s.NoError(err) + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + + // show collection + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + err = merr.CheckRPCCall(showCollectionsResp, err) + s.NoError(err) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + // insert + pkColumn := integration.NewInt64FieldData(integration.Int64Field, rowNum) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + err = merr.CheckRPCCall(insertResult, err) + s.NoError(err) + s.Equal(int64(rowNum), insertResult.GetInsertCnt()) + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + err = merr.CheckRPCCall(createIndexStatus, err) + s.NoError(err) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + // stats task happened + s.Equal(2, len(segments)) + s.Equal(int64(rowNum), segments[0].GetNumOfRows()) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(loadStatus, err) + s.NoError(err) + s.WaitForLoad(ctx, collectionName) + + // delete + deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: fmt.Sprintf("%s < %d", integration.Int64Field, deleteCnt), + }) + err = merr.CheckRPCCall(deleteResult, err) + s.NoError(err) + + // flush l0 + flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + flushTs, has = flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // query + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // wait for l0 compaction completed + showSegments := func() bool { + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + log.Info("ShowSegments result", zap.Any("segments", segments)) + flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetState() == commonpb.SegmentState_Flushed + }) + if len(flushed) == 1 && + flushed[0].GetLevel() == datapb.SegmentLevel_L1 && + flushed[0].GetNumOfRows() == rowNum { + log.Info("l0 compaction done, wait for single compaction") + } + return len(flushed) == 1 && + flushed[0].GetLevel() == datapb.SegmentLevel_L1 && + flushed[0].GetNumOfRows() == rowNum-deleteCnt + } + for !showSegments() { + select { + case <-ctx.Done(): + s.Fail("waiting for compaction timeout") + return + case <-time.After(1 * time.Second): + } + } + + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + + // query + queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // release collection + status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) // drop collection // status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ - // CollectionName: collectionName, + // CollectionName: collectionName, // }) // err = merr.CheckRPCCall(status, err) // s.NoError(err) - // log.Info("Test compaction succeed") + log.Info("Test compaction succeed") } diff --git a/tests/integration/import/dynamic_field_test.go b/tests/integration/import/dynamic_field_test.go index 14b1ab1c7d89f..f0d89dca486b0 100644 --- a/tests/integration/import/dynamic_field_test.go +++ b/tests/integration/import/dynamic_field_test.go @@ -47,7 +47,7 @@ func (s *BulkInsertSuite) testImportDynamicField() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) defer cancel() collectionName := "TestBulkInsert_B_" + funcutil.GenRandomStr() diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index a979cd1186306..8891428949023 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -76,7 +76,7 @@ func (s *BulkInsertSuite) run() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) defer cancel() collectionName := "TestBulkInsert" + funcutil.GenRandomStr() diff --git a/tests/integration/import/partition_key_test.go b/tests/integration/import/partition_key_test.go index 8ff58df999160..73fc98e5ab063 100644 --- a/tests/integration/import/partition_key_test.go +++ b/tests/integration/import/partition_key_test.go @@ -46,7 +46,7 @@ func (s *BulkInsertSuite) TestImportWithPartitionKey() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 240*time.Second) defer cancel() collectionName := "TestBulkInsert_WithPartitionKey_" + funcutil.GenRandomStr()