diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7ad57fd12d..55710ae642 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -410,6 +410,7 @@ func (a *FlowableActivity) SyncFlow( logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) lastCheckpoint := recordBatch.GetLastCheckpoint() + srcConn.UpdateReplStateLastOffset(lastCheckpoint) err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, diff --git a/flow/connectors/core.go b/flow/connectors/core.go index b374ff70e4..a8e39b67b1 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -54,6 +54,9 @@ type CDCPullConnector interface { // This method should be idempotent, and should be able to be called multiple times with the same request. PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error + // Called when offset has been confirmed to destination + UpdateReplStateLastOffset(lastOffset int64) + // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(ctx context.Context, jobName string) error diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f82b867f2b..1bba4e8eca 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -8,6 +8,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -49,6 +50,7 @@ type ReplState struct { Slot string Publication string Offset int64 + LastOffset atomic.Int64 } func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { @@ -133,7 +135,7 @@ func (c *PostgresConnector) ReplPing(ctx context.Context) error { return pglogrepl.SendStandbyStatusUpdate( ctx, c.replConn.PgConn(), - pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.Offset)}, + pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.LastOffset.Load())}, ) } } @@ -184,7 +186,9 @@ func (c *PostgresConnector) MaybeStartReplication( Slot: slotName, Publication: publicationName, Offset: req.LastOffset, + LastOffset: atomic.Int64{}, } + c.replState.LastOffset.Store(req.LastOffset) } return nil } @@ -308,6 +312,9 @@ func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, l func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { defer func() { req.RecordStream.Close() + if c.replState != nil { + c.replState.Offset = req.RecordStream.GetLastCheckpoint() + } }() // Slotname would be the job name prefixed with "peerflow_slot_" @@ -371,9 +378,6 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo return err } - req.RecordStream.Close() - c.replState.Offset = req.RecordStream.GetLastCheckpoint() - latestLSN, err := c.getCurrentLSN(ctx) if err != nil { c.logger.Error("error getting current LSN", slog.Any("error", err)) @@ -389,6 +393,12 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo return nil } +func (c *PostgresConnector) UpdateReplStateLastOffset(lastOffset int64) { + if c.replState != nil { + c.replState.LastOffset.Store(lastOffset) + } +} + // SyncRecords pushes records to the destination. func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)