From d02873e6d69c4aaf77848032163dfde5a6fe01dd Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 23 Oct 2023 16:28:04 +0530 Subject: [PATCH] i have no idea how this got missed --- flow/activities/flowable.go | 3 +++ flow/workflows/qrep_flow.go | 25 ++++++++++++++++++------- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3342e2d008..7b5a7a173c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -675,6 +675,9 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, defer connectors.CloseConnector(srcConn) pgSrcConn := srcConn.(*connpostgres.PostgresConnector) + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + }).Infof("current last partition value is %v\n", last) attemptCount := 1 for { activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount)) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 4debe11b3a..9a5a8641e8 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -238,6 +238,22 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { return nil } +func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition *protos.QRepPartition) error { + q.logger.Info("idling until new rows are detected") + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 16 * 365 * 24 * time.Hour, // 16 years + HeartbeatTimeout: 5 * time.Minute, + }) + + if err := workflow.ExecuteActivity(ctx, flowable.QRepWaitUntilNewRows, q.config, + lastPartition).Get(ctx, nil); err != nil { + return fmt.Errorf("failed while idling for new rows: %w", err) + } + + return nil +} + func QRepFlowWorkflow( ctx workflow.Context, config *protos.QRepConfig, @@ -257,11 +273,6 @@ func QRepFlowWorkflow( maxParallelWorkers = int(config.MaxParallelWorkers) } - waitBetweenBatches := 5 * time.Second - if config.WaitBetweenBatchesSeconds > 0 { - waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second - } - // register a signal handler to terminate the workflow terminateWorkflow := false signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) @@ -345,9 +356,9 @@ func QRepFlowWorkflow( } // sleep for a while and continue the workflow - err = workflow.Sleep(ctx, waitBetweenBatches) + err = q.waitForNewRows(ctx, lastPartition) if err != nil { - return fmt.Errorf("failed to sleep: %w", err) + return err } workflow.GetLogger(ctx).Info("Continuing as new workflow",