diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index e83010a82b..952650f303 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -759,7 +759,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } // if job is not yet found in the peerdb_mirror_jobs_table // OR sync is lagging end normalize - if !hasJob || normalizeBatchID == syncBatchID { + if !hasJob || normalizeBatchID >= syncBatchID { c.logger.Info("waiting for sync to catch up, so finishing") return &model.NormalizeResponse{ Done: false, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 4af7fe5f34..211b29ee12 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -419,7 +419,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, err } // normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded. - if syncBatchID == normalizeBatchID || !jobMetadataExists { + if normalizeBatchID >= syncBatchID || !jobMetadataExists { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", syncBatchID, normalizeBatchID)) return &model.NormalizeResponse{ diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index cccb24b528..b247626ab0 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -596,7 +596,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest return nil, err } // normalize has caught up with sync, chill until more records are loaded. - if syncBatchID == normalizeBatchID { + if normalizeBatchID >= syncBatchID { return &model.NormalizeResponse{ Done: false, StartBatchID: normalizeBatchID,