Skip to content

Commit

Permalink
We want last checkpoint, not current lsn
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 12, 2024
1 parent 727872e commit feea44b
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 55 deletions.
10 changes: 3 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,15 @@ func (a *FlowableActivity) StartFlow(

logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
lastCheckpoint := pglogrepl.LSN(recordBatch.GetLastCheckpoint())

err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
pglogrepl.LSN(lastCheckpoint),
lastCheckpoint,
)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand All @@ -407,7 +403,7 @@ func (a *FlowableActivity) StartFlow(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(lastCheckpoint),
lastCheckpoint,
)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;",
rawTableName, stagingTable)

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
lastCP := req.Records.GetLastCheckpoint()

activity.RecordHeartbeat(ctx,
fmt.Sprintf("Flow job %s: performing insert and update transaction"+
Expand Down
14 changes: 2 additions & 12 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCheckpoint,
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
Expand All @@ -130,12 +125,7 @@ func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, req.Records.GetLastCheckpoint())
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,7 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
c.logger.Error("failed to get last checkpoint", slog.Any("error", err))
return nil, err
}
lastCheckpoint := req.Records.GetLastCheckpoint()

err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
Expand Down
19 changes: 8 additions & 11 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type PostgresConnector struct {
type ReplState struct {
Slot string
Publication string
Offset pglogrepl.LSN
Offset int64
}

// NewPostgresConnector creates a new instance of PostgresConnector.
Expand Down Expand Up @@ -139,7 +139,7 @@ func (c *PostgresConnector) ReplPing(ctx context.Context) error {
return pglogrepl.SendStandbyStatusUpdate(
ctx,
c.replConn.PgConn(),
pglogrepl.StandbyStatusUpdate{WALWritePosition: c.replState.Offset},
pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.Offset)},
)
}
}
Expand All @@ -152,11 +152,11 @@ func (c *PostgresConnector) MaybeStartReplication(
publicationName string,
req *model.PullRecordsRequest,
) error {
if c.replState != nil && (int64(c.replState.Offset) != req.LastOffset ||
if c.replState != nil && (c.replState.Offset != req.LastOffset ||
c.replState.Slot != slotName ||
c.replState.Publication != publicationName) {
return fmt.Errorf("replState changed, reset connector. slot name: old=%s new=%s, publication: old=%s new=%s, offset: old=%d new=%d",
c.replState.Slot, slotName, c.replState.Publication, publicationName, int64(c.replState.Offset), req.LastOffset,
c.replState.Slot, slotName, c.replState.Publication, publicationName, c.replState.Offset, req.LastOffset,
)
}

Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *PostgresConnector) MaybeStartReplication(
c.replState = &ReplState{
Slot: slotName,
Publication: publicationName,
Offset: pglogrepl.LSN(req.LastOffset),
Offset: req.LastOffset,
}
}
return nil
Expand Down Expand Up @@ -366,12 +366,13 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
c.logger.Warn(fmt.Sprintf("FAILFAIL PullRecords %s", err))
return err
}
req.RecordStream.Close()
c.replState.Offset = req.RecordStream.GetLastCheckpoint()

latestLSN, err := c.getCurrentLSN(ctx)
if err != nil {
return fmt.Errorf("failed to get current LSN: %w", err)
}
c.replState.Offset = latestLSN

err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, latestLSN)
if err != nil {
Expand Down Expand Up @@ -504,10 +505,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY",
syncedRecordsCount, rawTableIdentifier))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("error getting last checkpoint: %w", err)
}
lastCP := req.Records.GetLastCheckpoint()

// updating metadata with new offset and syncBatchID
err = c.updateSyncMetadata(ctx, req.FlowJobName, lastCP, req.SyncBatchID, syncRecordsTx)
Expand Down Expand Up @@ -1103,7 +1101,6 @@ cc: <!channel>`,
return nil
}

// GetLastOffset returns the last synced offset for a job.
func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) {
row := conn.QueryRow(ctx, getNumConnectionsForUser, user)

Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq
}
c.logger.Info(fmt.Sprintf("Synced %d records", numRecords))

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
lastCheckpoint := req.Records.GetLastCheckpoint()

err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,8 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCheckpoint,
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
Expand Down
14 changes: 8 additions & 6 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,11 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
}
}

func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) {
func (r *CDCRecordStream) GetLastCheckpoint() int64 {
if !r.lastCheckpointSet {
return 0, errors.New("last checkpoint not set, stream is still active")
panic("last checkpoint not set, stream is still active")
}
return r.lastCheckpointID.Load(), nil
return r.lastCheckpointID.Load()
}

func (r *CDCRecordStream) AddRecord(record Record) {
Expand Down Expand Up @@ -500,9 +500,11 @@ func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExcl
}

func (r *CDCRecordStream) Close() {
close(r.emptySignal)
close(r.records)
r.lastCheckpointSet = true
if !r.lastCheckpointSet {
close(r.emptySignal)
close(r.records)
r.lastCheckpointSet = true
}
}

func (r *CDCRecordStream) GetRecords() <-chan Record {
Expand Down

0 comments on commit feea44b

Please sign in to comment.