Skip to content

Commit

Permalink
Merge branch 'main' into refactor-job-metadata-exists
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 14, 2023
2 parents c6431b3 + 1537991 commit 363e7d4
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 92 deletions.
13 changes: 8 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ func (a *FlowableActivity) GetLastSyncedID(
}
defer connectors.CloseConnector(dstConn)

return dstConn.GetLastOffset(config.FlowJobName)
var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

// EnsurePullability implements EnsurePullability.
Expand All @@ -115,7 +120,6 @@ func (a *FlowableActivity) CreateRawTable(
ctx context.Context,
config *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down Expand Up @@ -210,7 +214,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down Expand Up @@ -248,11 +251,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
Expand Down
15 changes: 6 additions & 9 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,30 +315,27 @@ func (c *BigQueryConnector) SetupMetadataTables() error {
return nil
}

// GetLastOffset returns the last synced ID.
func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
return nil, err
return 0, err
}

var row []bigquery.Value
err = it.Next(&row)
if err != nil {
c.logger.Info("no row found, returning nil")
return nil, nil
return 0, nil
}

if row[0] == nil {
c.logger.Info("no offset found, returning nil")
return nil, nil
return 0, nil
} else {
return &protos.LastSyncState{
Checkpoint: row[0].(int64),
}, nil
return row[0].(int64), nil
}
}

Expand Down Expand Up @@ -497,7 +494,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pgx/v5/pgxpool"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand All @@ -37,7 +38,7 @@ type CDCPullConnector interface {

// PullRecords pulls records from the source, and returns a RecordBatch.
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(req *model.PullRecordsRequest) error
PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error
Expand All @@ -59,7 +60,7 @@ type CDCSyncConnector interface {
SetupMetadataTables() error

// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(jobName string) (*protos.LastSyncState, error)
GetLastOffset(jobName string) (int64, error)

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)
Expand Down
16 changes: 3 additions & 13 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,11 @@ func (c *EventHubConnector) SetupMetadataTables() error {
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
Expand Down
12 changes: 4 additions & 8 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) {
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
Expand All @@ -147,20 +147,16 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
return &protos.LastSyncState{
Checkpoint: 0,
}, nil
return 0, nil
}

p.logger.Error("failed to get last offset", slog.Any("error", err))
return nil, err
return 0, err
}

p.logger.Info("got last offset for job", slog.Int64("offset", offset.Int64))

return &protos.LastSyncState{
Checkpoint: offset.Int64,
}, nil
return offset.Int64, nil
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {

// start replication
p.startLSN = 0
if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint))
p.startLSN = pglogrepl.LSN(req.LastSyncState.Checkpoint + 1)
if req.LastOffset > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset))
p.startLSN = pglogrepl.LSN(req.LastOffset + 1)
}

err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts)
Expand Down
36 changes: 15 additions & 21 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,35 +168,32 @@ func (c *PostgresConnector) SetupMetadataTables() error {
}

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.pool.
Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
}
defer rows.Close()

if !rows.Next() {
c.logger.Info("No row found, returning nil")
return nil, nil
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
return nil, nil
c.logger.Warn("Assuming zero offset means no sync has happened")
}

return &protos.LastSyncState{
Checkpoint: result.Int64,
}, nil
return result.Int64, nil
}

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error {
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
req.RecordStream.Close()
}()
Expand Down Expand Up @@ -249,16 +246,13 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error {
return err
}

catalogPool, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*pgxpool.Pool)
if ok {
latestLSN, err := c.getCurrentLSN()
if err != nil {
return fmt.Errorf("failed to get current LSN: %w", err)
}
err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN)
if err != nil {
return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err)
}
latestLSN, err := c.getCurrentLSN()
if err != nil {
return fmt.Errorf("failed to get current LSN: %w", err)
}
err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN)
if err != nil {
return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err)
}

return nil
Expand All @@ -273,7 +267,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1
records := make([][]interface{}, 0)
tableNameRowsMapping := make(map[string]uint32)

Expand Down
18 changes: 4 additions & 14 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,11 @@ func (c *S3Connector) SetupMetadataTables() error {
}

func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return nil, err
}

return res, nil
func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
Expand All @@ -200,7 +190,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

tableNameRowsMapping := make(map[string]uint32)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
Expand Down
17 changes: 7 additions & 10 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T
return res, nil
}

func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return nil, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
defer func() {
// not sure if the errors these two return are same or different?
Expand All @@ -300,20 +300,17 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncStat

if !rows.Next() {
c.logger.Warn("No row found ,returning nil")
return nil, nil
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
return nil, nil
c.logger.Warn("Assuming zero offset means no sync has happened")
}
return &protos.LastSyncState{
Checkpoint: result.Int64,
}, nil
return result.Int64, nil
}

func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
Expand Down Expand Up @@ -496,7 +493,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ func NewNameAndExclude(name string, exclude []string) NameAndExclude {
type PullRecordsRequest struct {
// FlowJobName is the name of the flow job.
FlowJobName string
// LastSyncedID is the last ID that was synced.
LastSyncState *protos.LastSyncState
// LastOffset is the latest LSN that was synced.
LastOffset int64
// MaxBatchSize is the max number of records to fetch.
MaxBatchSize uint32
// IdleTimeout is the timeout to wait for new records.
IdleTimeout time.Duration
//relId to name Mapping
// relId to name Mapping
SrcTableIDNameMapping map[uint32]string
// source to destination table name mapping
TableNameMapping map[string]NameAndExclude
Expand Down
7 changes: 3 additions & 4 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ const (
ShutdownSignal
PauseSignal

CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor"
FlowNameKey ContextKey = "flowName"
PartitionIDKey ContextKey = "partitionId"
DeploymentUIDKey ContextKey = "deploymentUid"
FlowNameKey ContextKey = "flowName"
PartitionIDKey ContextKey = "partitionId"
DeploymentUIDKey ContextKey = "deploymentUid"
)

type TaskQueueID int64
Expand Down

0 comments on commit 363e7d4

Please sign in to comment.