diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 918222d2a9..5121743acc 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -384,11 +384,7 @@ func (a *FlowableActivity) StartFlow( logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) - lastCheckpoint, err := recordBatch.GetLastCheckpoint() - if err != nil { - a.Alerter.LogFlowError(ctx, flowName, err) - return nil, fmt.Errorf("failed to get last checkpoint: %w", err) - } + lastCheckpoint := pglogrepl.LSN(recordBatch.GetLastCheckpoint()) err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, @@ -396,7 +392,7 @@ func (a *FlowableActivity) StartFlow( input.FlowConnectionConfigs.FlowJobName, res.CurrentSyncBatchID, uint32(numRecords), - pglogrepl.LSN(lastCheckpoint), + lastCheckpoint, ) if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) @@ -407,7 +403,7 @@ func (a *FlowableActivity) StartFlow( ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, - pglogrepl.LSN(lastCheckpoint), + lastCheckpoint, ) if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 60b6400c75..d69642028c 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -74,10 +74,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;", rawTableName, stagingTable) - lastCP, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, fmt.Errorf("failed to get last checkpoint: %w", err) - } + lastCP := req.Records.GetLastCheckpoint() activity.RecordHeartbeat(ctx, fmt.Sprintf("Flow job %s: performing insert and update transaction"+ diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 3074df7bad..1d8e4786c9 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -107,13 +107,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( return nil, fmt.Errorf("failed to sync schema changes: %w", err) } - lastCheckpoint, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, err - } - return &model.SyncResponse{ - LastSyncedCheckpointID: lastCheckpoint, + LastSyncedCheckpointID: req.Records.GetLastCheckpoint(), NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, @@ -130,12 +125,7 @@ func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe return nil, err } - lastCheckpoint, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, fmt.Errorf("failed to get last checkpoint: %w", err) - } - - err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint) + err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, req.Records.GetLastCheckpoint()) if err != nil { c.logger.Error("failed to increment id", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 55d2a4b6c4..7a221fa213 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -208,11 +208,7 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco return nil, err } - lastCheckpoint, err := req.Records.GetLastCheckpoint() - if err != nil { - c.logger.Error("failed to get last checkpoint", slog.Any("error", err)) - return nil, err - } + lastCheckpoint := req.Records.GetLastCheckpoint() err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint) if err != nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 319fc65b54..d6cbdbe45f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -46,7 +46,7 @@ type PostgresConnector struct { type ReplState struct { Slot string Publication string - Offset pglogrepl.LSN + Offset int64 } // NewPostgresConnector creates a new instance of PostgresConnector. @@ -139,7 +139,7 @@ func (c *PostgresConnector) ReplPing(ctx context.Context) error { return pglogrepl.SendStandbyStatusUpdate( ctx, c.replConn.PgConn(), - pglogrepl.StandbyStatusUpdate{WALWritePosition: c.replState.Offset}, + pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.Offset)}, ) } } @@ -152,11 +152,11 @@ func (c *PostgresConnector) MaybeStartReplication( publicationName string, req *model.PullRecordsRequest, ) error { - if c.replState != nil && (int64(c.replState.Offset) != req.LastOffset || + if c.replState != nil && (c.replState.Offset != req.LastOffset || c.replState.Slot != slotName || c.replState.Publication != publicationName) { return fmt.Errorf("replState changed, reset connector. slot name: old=%s new=%s, publication: old=%s new=%s, offset: old=%d new=%d", - c.replState.Slot, slotName, c.replState.Publication, publicationName, int64(c.replState.Offset), req.LastOffset, + c.replState.Slot, slotName, c.replState.Publication, publicationName, c.replState.Offset, req.LastOffset, ) } @@ -187,7 +187,7 @@ func (c *PostgresConnector) MaybeStartReplication( c.replState = &ReplState{ Slot: slotName, Publication: publicationName, - Offset: pglogrepl.LSN(req.LastOffset), + Offset: req.LastOffset, } } return nil @@ -366,12 +366,13 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo c.logger.Warn(fmt.Sprintf("FAILFAIL PullRecords %s", err)) return err } + req.RecordStream.Close() + c.replState.Offset = req.RecordStream.GetLastCheckpoint() latestLSN, err := c.getCurrentLSN(ctx) if err != nil { return fmt.Errorf("failed to get current LSN: %w", err) } - c.replState.Offset = latestLSN err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, latestLSN) if err != nil { @@ -504,10 +505,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", syncedRecordsCount, rawTableIdentifier)) - lastCP, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, fmt.Errorf("error getting last checkpoint: %w", err) - } + lastCP := req.Records.GetLastCheckpoint() // updating metadata with new offset and syncBatchID err = c.updateSyncMetadata(ctx, req.FlowJobName, lastCP, req.SyncBatchID, syncRecordsTx) @@ -1103,7 +1101,6 @@ cc: `, return nil } -// GetLastOffset returns the last synced offset for a job. func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) { row := conn.QueryRow(ctx, getNumConnectionsForUser, user) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 930f8f2204..23db82c0b2 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -184,10 +184,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq } c.logger.Info(fmt.Sprintf("Synced %d records", numRecords)) - lastCheckpoint, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, fmt.Errorf("failed to get last checkpoint: %w", err) - } + lastCheckpoint := req.Records.GetLastCheckpoint() err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint) if err != nil { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4a6d47434b..3a55556423 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -472,13 +472,8 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( return nil, fmt.Errorf("failed to sync schema changes: %w", err) } - lastCheckpoint, err := req.Records.GetLastCheckpoint() - if err != nil { - return nil, err - } - return &model.SyncResponse{ - LastSyncedCheckpointID: lastCheckpoint, + LastSyncedCheckpointID: req.Records.GetLastCheckpoint(), NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, diff --git a/flow/model/model.go b/flow/model/model.go index 14de42a44e..a1524eded5 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -455,11 +455,11 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) { } } -func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) { +func (r *CDCRecordStream) GetLastCheckpoint() int64 { if !r.lastCheckpointSet { - return 0, errors.New("last checkpoint not set, stream is still active") + panic("last checkpoint not set, stream is still active") } - return r.lastCheckpointID.Load(), nil + return r.lastCheckpointID.Load() } func (r *CDCRecordStream) AddRecord(record Record) { @@ -500,9 +500,11 @@ func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExcl } func (r *CDCRecordStream) Close() { - close(r.emptySignal) - close(r.records) - r.lastCheckpointSet = true + if !r.lastCheckpointSet { + close(r.emptySignal) + close(r.records) + r.lastCheckpointSet = true + } } func (r *CDCRecordStream) GetRecords() <-chan Record {