Skip to content

Commit

Permalink
fixed bug with monitoring post idle, printing upsert statements
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 9, 2023
1 parent 4ae89de commit 54a372e
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 7 deletions.
10 changes: 6 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,12 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
if err != nil {
return nil, err
if res.Done {
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
if err != nil {
return nil, err
}
}

// log the number of batches normalized
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
if !hasJob || normalizeBatchID == syncBatchID {
log.Printf("waiting for sync to catch up for job %s, so finishing", req.FlowJobName)
return &model.NormalizeResponse{
Done: true,
Done: false,
StartBatchID: normalizeBatchID,
EndBatchID: syncBatchID,
}, nil
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), internalSchema,
rawTableIdentifier, destinationTableIdentifier, deleteWhereClauseSQL)

log.Errorln("fallbackUpsertStatement", fallbackUpsertStatement)
log.Errorln("fallbackDeleteStatement", fallbackDeleteStatement)
return []string{fallbackUpsertStatement, fallbackDeleteStatement}
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
"flowName": req.FlowJobName,
}).Printf("no records to normalize: syncBatchID %d, normalizeBatchID %d", syncBatchID, normalizeBatchID)
return &model.NormalizeResponse{
Done: true,
Done: false,
StartBatchID: normalizeBatchID,
EndBatchID: syncBatchID,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
// normalize has caught up with sync, chill until more records are loaded.
if syncBatchID == normalizeBatchID {
return &model.NormalizeResponse{
Done: true,
Done: false,
StartBatchID: normalizeBatchID,
EndBatchID: syncBatchID,
}, nil
Expand Down

0 comments on commit 54a372e

Please sign in to comment.