Skip to content

Commit

Permalink
Fix everything
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 14, 2023
1 parent 313c708 commit a4a4d61
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 72 deletions.
9 changes: 6 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (*protos.LastSyncState, error) {
) (int64, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
return 0, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

Expand Down Expand Up @@ -251,14 +251,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
},
})
})

Expand Down
23 changes: 16 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,30 +316,39 @@ func (c *BigQueryConnector) SetupMetadataTables() error {
}

// GetLastOffset returns the last synced ID.
func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return nil, err
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found, returning nil")
return nil, nil
return 0, nil
}

if row[0] == nil {
c.logger.Info("no offset found, returning nil")
return nil, nil
return 0, nil
} else {
return &protos.LastSyncState{
Checkpoint: row[0].(int64),
}, nil
return row[0].(int64), nil
}
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf("UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, lastOffset, jobName)
q := c.client.Query(query)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ type CDCSyncConnector interface {
SetupMetadataTables() error

// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(jobName string) (*protos.LastSyncState, error)
GetLastOffset(jobName string) (int64, error)

// SetLastOffset updates the last offset on the metadata table on the destination
SetLastOffset(jobName string, lastOffset int64) error

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return syncBatchID, nil
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
return 0, err
}

return res, nil
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err))
Expand Down Expand Up @@ -197,7 +197,7 @@ func (c *EventHubConnector) processBatch(
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
err = c.SetLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return nil, err
Expand Down
13 changes: 4 additions & 9 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) {
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
Expand All @@ -145,22 +145,17 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn
var offset pgtype.Int8
err := rows.Scan(&offset)
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
return &protos.LastSyncState{
Checkpoint: 0,
}, nil
return 0, nil
}

p.logger.Error("failed to get last offset", slog.Any("error", err))
return nil, err
return 0, err
}

p.logger.Info("got last offset for job", slog.Int64("offset", offset.Int64))

return &protos.LastSyncState{
Checkpoint: offset.Int64,
}, nil
return offset.Int64, nil
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
Expand Down
21 changes: 7 additions & 14 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (

type PostgresCDCSource struct {
ctx context.Context
catalogPool *pgxpool.Pool
replPool *pgxpool.Pool
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
metadataSchema string
SetLastOffset func(int64) error
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
Expand All @@ -43,14 +42,14 @@ type PostgresCDCSource struct {

type PostgresCDCConfig struct {
AppContext context.Context
CatalogPool *pgxpool.Pool
Connection *pgxpool.Pool
Slot string
MetadataSchema string
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
SetLastOffset func(int64) error
}

// Create a new PostgresCDCSource
Expand All @@ -63,12 +62,11 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string)
return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
catalogPool: cdcConfig.CatalogPool,
replPool: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
metadataSchema: cdcConfig.MetadataSchema,
SetLastOffset: cdcConfig.SetLastOffset,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
Expand Down Expand Up @@ -148,9 +146,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {

// start replication
var clientXLogPos, startLSN pglogrepl.LSN
if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint))
clientXLogPos = pglogrepl.LSN(req.LastSyncState.Checkpoint)
if req.LastOffset > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset))
clientXLogPos = pglogrepl.LSN(req.LastOffset)
startLSN = clientXLogPos + 1
}

Expand Down Expand Up @@ -184,12 +182,7 @@ func (p *PostgresCDCSource) consumeStream(
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos

_, err := p.catalogPool.Exec(
p.ctx,
fmt.Sprintf(updateMetadataForLsnSQL, p.metadataSchema, mirrorJobsTableIdentifier),
int64(consumedXLogPos),
req.FlowJobName,
)
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ const (
createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)"

getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)"
checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3"
updateMetadataForLsnSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2"

getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name,
Expand Down
29 changes: 18 additions & 11 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,31 +168,39 @@ func (c *PostgresConnector) SetupMetadataTables() error {
}

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.pool.
Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
}
defer rows.Close()

if !rows.Next() {
c.logger.Info("No row found, returning nil")
return nil, nil
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}

if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
return nil, nil
c.logger.Warn("Assuming zero offset means no sync has happened")
}
return result.Int64, nil
}

return &protos.LastSyncState{
Checkpoint: result.Int64,
}, nil
// SetLastOffset updates the last synced offset for a job.
func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error {
_, err := c.pool.
Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName, lastOffset)
if err != nil {
return fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
}

return nil
}

// PullRecords pulls records from the source.
Expand Down Expand Up @@ -233,14 +241,13 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu

cdc, err := NewPostgresCDCSource(&PostgresCDCConfig{
AppContext: c.ctx,
CatalogPool: catalogPool,
Connection: c.replPool.Pool,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
MetadataSchema: c.metadataSchema,
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
SetLastOffset: req.SetLastOffset,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,17 @@ func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) {
return syncBatchID, nil
}

func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
return 0, err
}

return res, nil
}

// update offset for a job
func (c *S3Connector) updateLastOffset(jobName string, offset int64) error {
func (c *S3Connector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
Expand Down Expand Up @@ -227,7 +227,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err))
return nil, err
Expand Down
Loading

0 comments on commit a4a4d61

Please sign in to comment.