Skip to content

Commit

Permalink
improve last sync state
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 24, 2023
1 parent 1509630 commit 79bbbeb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
10 changes: 10 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

lastSyncState, err := a.GetLastSyncedID(ctx, &protos.GetLastSyncedIDInput{
PeerConnectionConfig: conn.Destination,
FlowJobName: conn.FlowJobName,
})
if err != nil {
return nil, fmt.Errorf("failed to get last synced ID: %w", err)
}

input.LastSyncState = lastSyncState

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (p *PostgresCDCSource) consumeStream(

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
log.Infof("[%s] Sent Standby status message. %s", req.FlowJobName, numRowsProcessedMessage)
standByLastLogged = time.Now()
}

Expand Down

0 comments on commit 79bbbeb

Please sign in to comment.