Skip to content

Commit

Permalink
GetLastOffset: return int64 (#825)
Browse files Browse the repository at this point in the history
Only keep `LastSyncState` around enough for backwards compatibility
  • Loading branch information
serprex authored Dec 14, 2023
1 parent 46208f1 commit 40d62cf
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 73 deletions.
9 changes: 7 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ func (a *FlowableActivity) GetLastSyncedID(
}
defer connectors.CloseConnector(dstConn)

return dstConn.GetLastOffset(config.FlowJobName)
var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

// EnsurePullability implements EnsurePullability.
Expand Down Expand Up @@ -252,7 +257,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
Expand Down
15 changes: 6 additions & 9 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,30 +315,27 @@ func (c *BigQueryConnector) SetupMetadataTables() error {
return nil
}

// GetLastOffset returns the last synced ID.
func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return nil, err
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found, returning nil")
return nil, nil
return 0, nil
}

if row[0] == nil {
c.logger.Info("no offset found, returning nil")
return nil, nil
return 0, nil
} else {
return &protos.LastSyncState{
Checkpoint: row[0].(int64),
}, nil
return row[0].(int64), nil
}
}

Expand Down Expand Up @@ -497,7 +494,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type CDCSyncConnector interface {
SetupMetadataTables() error

// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(jobName string) (*protos.LastSyncState, error)
GetLastOffset(jobName string) (int64, error)

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)
Expand Down
16 changes: 3 additions & 13 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,11 @@ func (c *EventHubConnector) SetupMetadataTables() error {
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
Expand Down
12 changes: 4 additions & 8 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) {
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
Expand All @@ -147,20 +147,16 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
return &protos.LastSyncState{
Checkpoint: 0,
}, nil
return 0, nil
}

p.logger.Error("failed to get last offset", slog.Any("error", err))
return nil, err
return 0, err
}

p.logger.Info("got last offset for job", slog.Int64("offset", offset.Int64))

return &protos.LastSyncState{
Checkpoint: offset.Int64,
}, nil
return offset.Int64, nil
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {

// start replication
p.startLSN = 0
if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint))
p.startLSN = pglogrepl.LSN(req.LastSyncState.Checkpoint + 1)
if req.LastOffset > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset))
p.startLSN = pglogrepl.LSN(req.LastOffset + 1)
}

err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts)
Expand Down
17 changes: 7 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,31 +168,28 @@ func (c *PostgresConnector) SetupMetadataTables() error {
}

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.pool.
Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
}
defer rows.Close()

if !rows.Next() {
c.logger.Info("No row found, returning nil")
return nil, nil
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
return nil, nil
c.logger.Warn("Assuming zero offset means no sync has happened")
}

return &protos.LastSyncState{
Checkpoint: result.Int64,
}, nil
return result.Int64, nil
}

// PullRecords pulls records from the source.
Expand Down Expand Up @@ -273,7 +270,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1
records := make([][]interface{}, 0)
tableNameRowsMapping := make(map[string]uint32)

Expand Down
18 changes: 4 additions & 14 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,11 @@ func (c *S3Connector) SetupMetadataTables() error {
}

func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
Expand All @@ -200,7 +190,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

tableNameRowsMapping := make(map[string]uint32)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
Expand Down
17 changes: 7 additions & 10 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T
return res, nil
}

func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return nil, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
defer func() {
// not sure if the errors these two return are same or different?
Expand All @@ -300,20 +300,17 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncStat

if !rows.Next() {
c.logger.Warn("No row found ,returning nil")
return nil, nil
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
return nil, nil
c.logger.Warn("Assuming zero offset means no sync has happened")
}
return &protos.LastSyncState{
Checkpoint: result.Int64,
}, nil
return result.Int64, nil
}

func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
Expand Down Expand Up @@ -496,7 +493,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ func NewNameAndExclude(name string, exclude []string) NameAndExclude {
type PullRecordsRequest struct {
// FlowJobName is the name of the flow job.
FlowJobName string
// LastSyncedID is the last ID that was synced.
LastSyncState *protos.LastSyncState
// LastOffset is the latest LSN that was synced.
LastOffset int64
// MaxBatchSize is the max number of records to fetch.
MaxBatchSize uint32
// IdleTimeout is the timeout to wait for new records.
IdleTimeout time.Duration
//relId to name Mapping
// relId to name Mapping
SrcTableIDNameMapping map[uint32]string
// source to destination table name mapping
TableNameMapping map[string]NameAndExclude
Expand Down

0 comments on commit 40d62cf

Please sign in to comment.