Skip to content

Commit

Permalink
Possible normalize is above syncBatchID, that's okay
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 23, 2023
1 parent d64288b commit 6af635e
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
// if job is not yet found in the peerdb_mirror_jobs_table
// OR sync is lagging end normalize
if !hasJob || normalizeBatchID == syncBatchID {
if !hasJob || normalizeBatchID >= syncBatchID {
c.logger.Info("waiting for sync to catch up, so finishing")
return &model.NormalizeResponse{
Done: false,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, err
}
// normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded.
if syncBatchID == normalizeBatchID || !jobMetadataExists {
if normalizeBatchID >= syncBatchID || !jobMetadataExists {
c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d",
syncBatchID, normalizeBatchID))
return &model.NormalizeResponse{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
return nil, err
}
// normalize has caught up with sync, chill until more records are loaded.
if syncBatchID == normalizeBatchID {
if normalizeBatchID >= syncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: normalizeBatchID,
Expand Down

0 comments on commit 6af635e

Please sign in to comment.