From 0c8ef08a4baf73a2506b971c86182404d891e0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 7 May 2024 13:07:10 +0000 Subject: [PATCH 1/2] fix ci --- flow/activities/flowable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7571c74b55..072c52f3c3 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -643,7 +643,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - if config.SourcePeer.Type != protos.DBType_POSTGRES { + if config.SourcePeer.Type != protos.DBType_POSTGRES || last == nil || last.Range == nil { return QRepWaitUntilNewRowsResult{Found: true}, nil } From 71e876ad6786b53a9ad677c2d42e0f8dbc8aef46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 7 May 2024 13:18:03 +0000 Subject: [PATCH 2/2] correct fix --- flow/activities/flowable.go | 2 +- flow/workflows/qrep_flow.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 072c52f3c3..7571c74b55 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -643,7 +643,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - if config.SourcePeer.Type != protos.DBType_POSTGRES || last == nil || last.Range == nil { + if config.SourcePeer.Type != protos.DBType_POSTGRES { return QRepWaitUntilNewRowsResult{Found: true}, nil } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index a988e1f96c..8be2619866 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -570,7 +570,7 @@ func QRepFlowWorkflow( return err } - if state.LastPartition != nil { + if !config.InitialCopyOnly && state.LastPartition != nil { if err := q.waitForNewRows(ctx, signalChan, state.LastPartition); err != nil { return err }