From a19df849eb37d0562723263bfb5eac3138c105d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 27 Feb 2024 17:31:58 +0000 Subject: [PATCH 01/14] Test qrep pause/unpause --- docker-compose-dev.yml | 2 +- docker-compose.yml | 2 +- flow/e2e/postgres/qrep_flow_pg_test.go | 56 ++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 8a8181ba6..210fc507d 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -56,7 +56,7 @@ services: catalog: condition: service_healthy environment: - - DB=postgres12 + - DB=postgresql - DB_PORT=5432 - POSTGRES_USER=postgres - POSTGRES_PWD=postgres diff --git a/docker-compose.yml b/docker-compose.yml index 7ac7e19bb..2e0a83be2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,7 +49,7 @@ services: catalog: condition: service_healthy environment: - - DB=postgres12 + - DB=postgresql - DB_PORT=5432 - POSTGRES_USER=postgres - POSTGRES_PWD=postgres diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index a7a367211..9753808a0 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -17,7 +17,9 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) type PeerFlowE2ETestSuitePG struct { @@ -324,3 +326,57 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) require.NoError(s.t, env.Error()) } + +func (s PeerFlowE2ETestSuitePG) Test_Pause() { + numRows := 10 + + srcTable := "qrep_pause" + s.setupSourceTable(srcTable, numRows) + + dstTable := "qrep_pause_dst" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.suffix, srcTable) + + config, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_columns_pg", + srcSchemaQualified, + dstSchemaQualified, + query, + e2e.GeneratePostgresPeer(), + "", + true, + "_PEERDB_SYNCED_AT", + ) + require.NoError(s.t, err) + + tc := e2e.NewTemporalClient(s.t) + env := e2e.RunQrepFlowWorkflow(tc, config) + e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) + + e2e.EnvWaitFor(s.t, env, time.Minute, "pausing", func() bool { + response, err := env.Query(shared.QRepFlowStateQuery) + if err != nil { + return false + } + var state peerflow.CDCFlowWorkflowState + err = response.Get(&state) + return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED + }) + e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal) + e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool { + response, err := env.Query(shared.QRepFlowStateQuery) + if err != nil { + return false + } + var state peerflow.CDCFlowWorkflowState + err = response.Get(&state) + return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING + }) + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} From 9451d8eb2733e87399712928ae52d5e5631fabf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 15:09:39 +0000 Subject: [PATCH 02/14] Log errors polling status --- flow/e2e/postgres/qrep_flow_pg_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 9753808a0..73806816f 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -360,21 +360,29 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { e2e.EnvWaitFor(s.t, env, time.Minute, "pausing", func() bool { response, err := env.Query(shared.QRepFlowStateQuery) if err != nil { + s.t.Log(err) return false } var state peerflow.CDCFlowWorkflowState err = response.Get(&state) - return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED + if err != nil { + s.t.Fatal("decode failed", err) + } + return state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED }) e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal) e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool { response, err := env.Query(shared.QRepFlowStateQuery) if err != nil { + s.t.Log(err) return false } var state peerflow.CDCFlowWorkflowState err = response.Get(&state) - return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING + if err != nil { + s.t.Fatal("decode failed", err) + } + return state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING }) env.Cancel() From b46ff1ecbfd98918eabce31c09440d6af14ba349 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 15:25:31 +0000 Subject: [PATCH 03/14] fix state type --- flow/e2e/postgres/qrep_flow_pg_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 73806816f..a2d00b47a 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -19,7 +19,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - peerflow "github.com/PeerDB-io/peer-flow/workflows" ) type PeerFlowE2ETestSuitePG struct { @@ -363,7 +362,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { s.t.Log(err) return false } - var state peerflow.CDCFlowWorkflowState + var state *protos.QRepFlowState err = response.Get(&state) if err != nil { s.t.Fatal("decode failed", err) @@ -374,10 +373,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool { response, err := env.Query(shared.QRepFlowStateQuery) if err != nil { - s.t.Log(err) - return false + s.t.Fatal(err) } - var state peerflow.CDCFlowWorkflowState + var state *protos.QRepFlowState err = response.Get(&state) if err != nil { s.t.Fatal("decode failed", err) From a8e18f60532ad61d86d223bc105043b078f8657a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 17:04:45 +0000 Subject: [PATCH 04/14] change name --- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index a2d00b47a..aa51b7d14 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -341,7 +341,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { s.suffix, srcTable) config, err := e2e.CreateQRepWorkflowConfig( - "test_qrep_columns_pg", + "test_qrep_pause_pg", srcSchemaQualified, dstSchemaQualified, query, From 09b29aafc0eb9a8e5f644df13faf3c606aecf287 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 18:26:45 +0000 Subject: [PATCH 05/14] more time --- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index aa51b7d14..ab651e5df 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -356,7 +356,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { env := e2e.RunQrepFlowWorkflow(tc, config) e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) - e2e.EnvWaitFor(s.t, env, time.Minute, "pausing", func() bool { + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "pausing", func() bool { response, err := env.Query(shared.QRepFlowStateQuery) if err != nil { s.t.Log(err) From ee051c6fa813d6f5110c120885d5c2930509d496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 19:28:13 +0000 Subject: [PATCH 06/14] why --- flow/e2e/postgres/qrep_flow_pg_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index ab651e5df..fb49d3d24 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -351,6 +351,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { "_PEERDB_SYNCED_AT", ) require.NoError(s.t, err) + config.InitialCopyOnly = false tc := e2e.NewTemporalClient(s.t) env := e2e.RunQrepFlowWorkflow(tc, config) From 9d5bff7c2005a0aea2c0a0935a02490047cf01e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Mar 2024 20:36:07 +0000 Subject: [PATCH 07/14] qrep pause: move signal processing into waiting --- flow/workflows/qrep_flow.go | 72 ++++++++++++++++--------------------- flow/workflows/xmin_flow.go | 10 +++--- protos/flow.proto | 2 +- 3 files changed, 38 insertions(+), 46 deletions(-) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index c80e76c01..d37579f2f 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -46,19 +46,6 @@ func NewQRepFlowState() *protos.QRepFlowState { } } -// returns a new empty QRepFlowState -func NewQRepFlowStateForTesting() *protos.QRepFlowState { - return &protos.QRepFlowState{ - LastPartition: &protos.QRepPartition{ - PartitionId: "not-applicable-partition", - Range: nil, - }, - NumPartitionsProcessed: 0, - NeedsResync: true, - DisableWaitForNewRows: true, - } -} - // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { return &QRepFlowExecution{ @@ -291,20 +278,39 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { return nil } -func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error { +func (q *QRepFlowExecution) waitForNewRows( + ctx workflow.Context, + signalChan model.TypedReceiveChannel[model.CDCFlowSignal], + lastPartition *protos.QRepPartition, +) error { q.logger.Info("idling until new rows are detected") + var done bool + var doneErr error + selector := workflow.NewNamedSelector(ctx, "WaitForNewRows") + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years HeartbeatTimeout: time.Minute, }) + fWait := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, lastPartition) + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + selector.AddFuture(fWait, func(f workflow.Future) { + doneErr = f.Get(ctx, nil) + done = true + }) + signalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) { + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + }) - if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, - lastPartition).Get(ctx, nil); err != nil { - return fmt.Errorf("failed while idling for new rows: %w", err) + for ctx.Err() != nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) { + selector.Select(ctx) } - return nil + if err := ctx.Err(); err != nil { + return err + } + return doneErr } func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error { @@ -364,16 +370,6 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } -func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan model.TypedReceiveChannel[model.CDCFlowSignal]) { - for { - val, ok := signalChan.ReceiveAsync() - if !ok { - break - } - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) - } -} - func setWorkflowQueries(ctx workflow.Context, state *protos.QRepFlowState) error { // Support an Update for the current status of the qrep flow. err := workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { @@ -475,13 +471,13 @@ func QRepFlowWorkflow( return err } - logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName) + logger.Info("fetching partitions to replicate for peer flow") partitions, err := q.GetPartitions(ctx, state.LastPartition) if err != nil { return fmt.Errorf("failed to get partitions: %w", err) } - logger.Info("partitions to replicate - ", len(partitions.Partitions)) + logger.Info(fmt.Sprintf("%d partitions to replicate", len(partitions.Partitions))) if err := q.processPartitions(ctx, maxParallelWorkers, partitions.Partitions); err != nil { return err } @@ -501,29 +497,23 @@ func QRepFlowWorkflow( return err } - logger.Info("partitions processed - ", len(partitions.Partitions)) + logger.Info(fmt.Sprintf("%d partitions processed", len(partitions.Partitions))) state.NumPartitionsProcessed += uint64(len(partitions.Partitions)) if len(partitions.Partitions) > 0 { state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - if !state.DisableWaitForNewRows { - // sleep for a while and continue the workflow - err = q.waitForNewRows(ctx, state.LastPartition) - if err != nil { - return err - } + // sleep for a while and continue the workflow + err = q.waitForNewRows(ctx, signalChan, state.LastPartition) + if err != nil { + return err } logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) - q.receiveAndHandleSignalAsync(signalChan) - if err := ctx.Err(); err != nil { - return err - } if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index c5334221c..5d617f4af 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -100,14 +100,16 @@ func XminFlowWorkflow( Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } + // sleep for a while and continue the workflow + err = q.waitForNewRows(ctx, signalChan, state.LastPartition) + if err != nil { + return err + } + logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) - q.receiveAndHandleSignalAsync(signalChan) - if err := ctx.Err(); err != nil { - return err - } if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED } diff --git a/protos/flow.proto b/protos/flow.proto index f5d804ae7..0ab9b94a3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -322,7 +322,7 @@ message QRepFlowState { QRepPartition last_partition = 1; uint64 num_partitions_processed = 2; bool needs_resync = 3; - bool disable_wait_for_new_rows = 4; + bool disable_wait_for_new_rows = 4; // deprecated FlowStatus current_flow_status = 5; } From 901ffab59e05722ab794ad4f73fd764a62bb231e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 01:06:38 +0000 Subject: [PATCH 08/14] remove comment that is now downplaying scope of method --- flow/workflows/qrep_flow.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index d37579f2f..9f2984b7c 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -504,7 +504,6 @@ func QRepFlowWorkflow( state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - // sleep for a while and continue the workflow err = q.waitForNewRows(ctx, signalChan, state.LastPartition) if err != nil { return err From 74baf809df1fbd28679c94beac8e0a5fe83deac1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 03:44:15 +0000 Subject: [PATCH 09/14] PauseSelector logging --- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- flow/workflows/qrep_flow.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index fb49d3d24..849e74024 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -347,7 +347,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { query, e2e.GeneratePostgresPeer(), "", - true, + false, "_PEERDB_SYNCED_AT", ) require.NoError(s.t, err) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 9f2984b7c..26dd26f7e 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -307,6 +307,10 @@ func (q *QRepFlowExecution) waitForNewRows( selector.Select(ctx) } + q.logger.Info("PauseSelector finished", slog.Bool("done", done), + slog.Any("activeSignal", model.FlowSignalHandler), + slog.Any("doneErr", doneErr), + slog.Any("ctxErr", ctx.Err())) if err := ctx.Err(); err != nil { return err } @@ -511,7 +515,7 @@ func QRepFlowWorkflow( logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), - slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) + slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed)) if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED From 7de2363370eb29f7aaddf7141493ed0797366b20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 04:02:40 +0000 Subject: [PATCH 10/14] do not flip that bool --- flow/e2e/congen.go | 34 +++++++++++--------------- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- flow/e2e/test_utils.go | 5 +--- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 603c6f0b7..82387c9ff 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -218,25 +218,19 @@ type QRepFlowConnectionGenerationConfig struct { // GenerateQRepConfig generates a qrep config for testing. func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( query string, watermark string, -) (*protos.QRepConfig, error) { - ret := &protos.QRepConfig{} - ret.FlowJobName = c.FlowJobName - ret.WatermarkTable = c.WatermarkTable - ret.DestinationTableIdentifier = c.DestinationTableIdentifier - - postgresPeer := GeneratePostgresPeer() - ret.SourcePeer = postgresPeer - - ret.DestinationPeer = c.Destination - - ret.Query = query - ret.WatermarkColumn = watermark - - ret.StagingPath = c.StagingPath - ret.WriteMode = &protos.QRepWriteMode{ - WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, +) *protos.QRepConfig { + return &protos.QRepConfig{ + FlowJobName: c.FlowJobName, + WatermarkTable: c.WatermarkTable, + DestinationTableIdentifier: c.DestinationTableIdentifier, + SourcePeer: GeneratePostgresPeer(), + DestinationPeer: c.Destination, + Query: query, + WatermarkColumn: watermark, + StagingPath: c.StagingPath, + WriteMode: &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, + }, + NumRowsPerPartition: 1000, } - ret.NumRowsPerPartition = 1000 - - return ret, nil } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 849e74024..fb49d3d24 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -347,7 +347,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { query, e2e.GeneratePostgresPeer(), "", - false, + true, "_PEERDB_SYNCED_AT", ) require.NoError(s.t, err) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 92d04d1f3..8592eb406 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -396,10 +396,7 @@ func CreateQRepWorkflowConfig( watermark := "updated_at" - qrepConfig, err := connectionGen.GenerateQRepConfig(query, watermark) - if err != nil { - return nil, err - } + qrepConfig := connectionGen.GenerateQRepConfig(query, watermark) qrepConfig.InitialCopyOnly = true qrepConfig.SyncedAtColName = syncedAtCol qrepConfig.SetupWatermarkTableOnDestination = setupDst From 09fdcbe369f22e83e6fb3f941702e50e07fa356e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 04:12:25 +0000 Subject: [PATCH 11/14] fix log, fix ! --- flow/workflows/qrep_flow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 26dd26f7e..b77e36b1b 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -303,12 +303,12 @@ func (q *QRepFlowExecution) waitForNewRows( q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) }) - for ctx.Err() != nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) { + for ctx.Err() == nil && ((!done && q.activeSignal != model.PauseSignal) || selector.HasPending()) { selector.Select(ctx) } q.logger.Info("PauseSelector finished", slog.Bool("done", done), - slog.Any("activeSignal", model.FlowSignalHandler), + slog.Any("activeSignal", q.activeSignal), slog.Any("doneErr", doneErr), slog.Any("ctxErr", ctx.Err())) if err := ctx.Err(); err != nil { From 19b0da2da92a7c0160ad5e12835c76c4e7f975d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 04:25:53 +0000 Subject: [PATCH 12/14] set status when breaking from pause --- flow/workflows/qrep_flow.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index b77e36b1b..0b33f4ef4 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -452,6 +452,7 @@ func QRepFlowWorkflow( return err } } + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING } maxParallelWorkers := 16 From d65949465b8cb7360a2fe47119a8ff1b408b84fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 04:35:34 +0000 Subject: [PATCH 13/14] cleanup, also fix xmin --- flow/workflows/qrep_flow.go | 6 +----- flow/workflows/xmin_flow.go | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 0b33f4ef4..839eab9dd 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -86,7 +86,7 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { } func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName string) (*protos.TableSchema, error) { - q.logger.Info("fetching schema for table - ", tableName) + q.logger.Info("fetching schema for table", slog.String("table", tableName)) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, @@ -307,10 +307,6 @@ func (q *QRepFlowExecution) waitForNewRows( selector.Select(ctx) } - q.logger.Info("PauseSelector finished", slog.Bool("done", done), - slog.Any("activeSignal", q.activeSignal), - slog.Any("doneErr", doneErr), - slog.Any("ctxErr", ctx.Err())) if err := ctx.Err(); err != nil { return err } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 5d617f4af..94733b40c 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -47,6 +47,7 @@ func XminFlowWorkflow( return err } } + state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING } err = q.SetupWatermarkTableOnDestination(ctx) @@ -100,7 +101,6 @@ func XminFlowWorkflow( Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } - // sleep for a while and continue the workflow err = q.waitForNewRows(ctx, signalChan, state.LastPartition) if err != nil { return err @@ -108,7 +108,7 @@ func XminFlowWorkflow( logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition), - slog.Any("Number of Partitions Processed", state.NumPartitionsProcessed)) + slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed)) if q.activeSignal == model.PauseSignal { state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED From e3f9ff98a1185110dba15e8e8dd46f07349d2fe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 05:36:29 +0000 Subject: [PATCH 14/14] xmin should not waitForNewRows --- flow/workflows/xmin_flow.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 94733b40c..777daba38 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -101,10 +101,16 @@ func XminFlowWorkflow( Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } - err = q.waitForNewRows(ctx, signalChan, state.LastPartition) - if err != nil { + if err := ctx.Err(); err != nil { return err } + for { + val, ok := signalChan.ReceiveAsync() + if !ok { + break + } + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + } logger.Info("Continuing as new workflow", slog.Any("Last Partition", state.LastPartition),