From e5c51da8efa6b309c8660af9a4193ceb3e5c5759 Mon Sep 17 00:00:00 2001 From: Yasin Zaehringer Date: Fri, 26 Apr 2024 18:34:26 +0100 Subject: [PATCH] qrep: if the last sync is nil, check the table if there are new rows. this bug can prevent tables which are empty to be synced once they get data. --- flow/activities/flowable.go | 2 +- flow/connectors/postgres/qrep.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7c3a41ac58..f2bee637b7 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -615,7 +615,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil { + if config.SourcePeer.Type != protos.DBType_POSTGRES { return QRepWaitUntilNewRowsResult{Found: true}, nil } diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 8ec936d2ea..0d298a193f 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -287,6 +287,10 @@ func (c *PostgresConnector) CheckForUpdatedMaxValue( return false, fmt.Errorf("error while getting min and max values: %w", err) } + if last == nil || last.Range == nil { + return maxValue != nil, nil + } + switch x := last.Range.Range.(type) { case *protos.PartitionRange_IntRange: if maxValue.(int64) > x.IntRange.End {