diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d29981f703..9a86549097 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -88,7 +88,12 @@ func (a *FlowableActivity) GetLastSyncedID( } defer connectors.CloseConnector(dstConn) - return dstConn.GetLastOffset(config.FlowJobName) + var lastOffset int64 + lastOffset, err = dstConn.GetLastOffset(config.FlowJobName) + if err != nil { + return nil, err + } + return &protos.LastSyncState{Checkpoint: lastOffset}, nil } // EnsurePullability implements EnsurePullability. @@ -252,7 +257,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, FlowJobName: input.FlowConnectionConfigs.FlowJobName, SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, - LastSyncState: input.LastSyncState, + LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(), TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index eb6ac8d198..e1f57e9c28 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -315,30 +315,27 @@ func (c *BigQueryConnector) SetupMetadataTables() error { return nil } -// GetLastOffset returns the last synced ID. -func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return nil, err + return 0, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found, returning nil") - return nil, nil + return 0, nil } if row[0] == nil { c.logger.Info("no offset found, returning nil") - return nil, nil + return 0, nil } else { - return &protos.LastSyncState{ - Checkpoint: row[0].(int64), - }, nil + return row[0].(int64), nil } } @@ -497,7 +494,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID) if err != nil { diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6c482f852a..2845e371d0 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -59,7 +59,7 @@ type CDCSyncConnector interface { SetupMetadataTables() error // GetLastOffset gets the last offset from the metadata table on the destination - GetLastOffset(jobName string) (*protos.LastSyncState, error) + GetLastOffset(jobName string) (int64, error) // GetLastSyncBatchID gets the last batch synced to the destination from the metadata table GetLastSyncBatchID(jobName string) (int64, error) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 11679e1dab..01b8510e42 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -102,21 +102,11 @@ func (c *EventHubConnector) SetupMetadataTables() error { } func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { - syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) - if err != nil { - return 0, err - } - - return syncBatchID, nil + return c.pgMetadata.GetLastBatchID(jobName) } -func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - res, err := c.pgMetadata.FetchLastOffset(jobName) - if err != nil { - return nil, err - } - - return res, nil +func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(jobName) } func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ddfd6b63b0..e2127b3099 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -136,7 +136,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { return nil } -func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) { +func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset FROM `+p.schemaName+`.`+lastSyncStateTableName+` @@ -147,20 +147,16 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn if err != nil { // if the job doesn't exist, return 0 if err.Error() == "no rows in result set" { - return &protos.LastSyncState{ - Checkpoint: 0, - }, nil + return 0, nil } p.logger.Error("failed to get last offset", slog.Any("error", err)) - return nil, err + return 0, err } p.logger.Info("got last offset for job", slog.Int64("offset", offset.Int64)) - return &protos.LastSyncState{ - Checkpoint: offset.Int64, - }, nil + return offset.Int64, nil } func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index cf75fb2e2c..a56ee49edf 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -143,9 +143,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { // start replication p.startLSN = 0 - if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 { - p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint)) - p.startLSN = pglogrepl.LSN(req.LastSyncState.Checkpoint + 1) + if req.LastOffset > 0 { + p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) + p.startLSN = pglogrepl.LSN(req.LastOffset + 1) } err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e5bec86b06..5ac489a6df 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -168,31 +168,28 @@ func (c *PostgresConnector) SetupMetadataTables() error { } // GetLastOffset returns the last synced offset for a job. -func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.pool. Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) + return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) } defer rows.Close() if !rows.Next() { c.logger.Info("No row found, returning nil") - return nil, nil + return 0, nil } var result pgtype.Int8 err = rows.Scan(&result) if err != nil { - return nil, fmt.Errorf("error while reading result row: %w", err) + return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") - return nil, nil + c.logger.Warn("Assuming zero offset means no sync has happened") } - return &protos.LastSyncState{ - Checkpoint: result.Int64, - }, nil + return result.Int64, nil } // PullRecords pulls records from the source. @@ -273,7 +270,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 records := make([][]interface{}, 0) tableNameRowsMapping := make(map[string]uint32) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7dd102a305..6707d96200 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -167,21 +167,11 @@ func (c *S3Connector) SetupMetadataTables() error { } func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { - syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) - if err != nil { - return 0, err - } - - return syncBatchID, nil + return c.pgMetadata.GetLastBatchID(jobName) } -func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - res, err := c.pgMetadata.FetchLastOffset(jobName) - if err != nil { - return nil, err - } - - return res, nil +func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(jobName) } // update offset for a job @@ -200,7 +190,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 tableNameRowsMapping := make(map[string]uint32) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 1d7dcba312..b1dcbb3c62 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -284,11 +284,11 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T return res, nil } -func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return nil, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) + return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) } defer func() { // not sure if the errors these two return are same or different? @@ -300,20 +300,17 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncStat if !rows.Next() { c.logger.Warn("No row found ,returning nil") - return nil, nil + return 0, nil } var result pgtype.Int8 err = rows.Scan(&result) if err != nil { - return nil, fmt.Errorf("error while reading result row: %w", err) + return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") - return nil, nil + c.logger.Warn("Assuming zero offset means no sync has happened") } - return &protos.LastSyncState{ - Checkpoint: result.Int64, - }, nil + return result.Int64, nil } func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { @@ -496,7 +493,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) if err != nil { diff --git a/flow/model/model.go b/flow/model/model.go index 320ad47e61..02949f3c2e 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -30,13 +30,13 @@ func NewNameAndExclude(name string, exclude []string) NameAndExclude { type PullRecordsRequest struct { // FlowJobName is the name of the flow job. FlowJobName string - // LastSyncedID is the last ID that was synced. - LastSyncState *protos.LastSyncState + // LastOffset is the latest LSN that was synced. + LastOffset int64 // MaxBatchSize is the max number of records to fetch. MaxBatchSize uint32 // IdleTimeout is the timeout to wait for new records. IdleTimeout time.Duration - //relId to name Mapping + // relId to name Mapping SrcTableIDNameMapping map[uint32]string // source to destination table name mapping TableNameMapping map[string]NameAndExclude