Skip to content

Commit

Permalink
Merge branch 'main' into generic-simple-schema-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 14, 2024
2 parents d0ef472 + 22d6528 commit dd3d718
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
15 changes: 12 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ func (a *FlowableActivity) SyncFlow(
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
Expand Down Expand Up @@ -315,6 +314,7 @@ func (a *FlowableActivity) SyncFlow(
if err != nil {
return nil, err
}
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
Expand Down Expand Up @@ -346,7 +346,11 @@ func (a *FlowableActivity) SyncFlow(
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed in pull records when: %w", err)
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed in pull records when: %w", err)
}
}
logger.Info("no records to push")

Expand Down Expand Up @@ -401,7 +405,11 @@ func (a *FlowableActivity) SyncFlow(
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed to pull records: %w", err)
}
}

numRecords := res.NumRecordsSynced
Expand All @@ -410,6 +418,7 @@ func (a *FlowableActivity) SyncFlow(
logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint := recordBatch.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)

err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type CDCPullConnector interface {
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error

// Called when offset has been confirmed to destination
UpdateReplStateLastOffset(lastOffset int64)

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(ctx context.Context, jobName string) error

Expand Down
18 changes: 14 additions & 4 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -49,6 +50,7 @@ type ReplState struct {
Slot string
Publication string
Offset int64
LastOffset atomic.Int64
}

func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
Expand Down Expand Up @@ -133,7 +135,7 @@ func (c *PostgresConnector) ReplPing(ctx context.Context) error {
return pglogrepl.SendStandbyStatusUpdate(
ctx,
c.replConn.PgConn(),
pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.Offset)},
pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.LastOffset.Load())},
)
}
}
Expand Down Expand Up @@ -184,7 +186,9 @@ func (c *PostgresConnector) MaybeStartReplication(
Slot: slotName,
Publication: publicationName,
Offset: req.LastOffset,
LastOffset: atomic.Int64{},
}
c.replState.LastOffset.Store(req.LastOffset)
}
return nil
}
Expand Down Expand Up @@ -308,6 +312,9 @@ func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, l
func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
req.RecordStream.Close()
if c.replState != nil {
c.replState.Offset = req.RecordStream.GetLastCheckpoint()
}
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
Expand Down Expand Up @@ -371,9 +378,6 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return err
}

req.RecordStream.Close()
c.replState.Offset = req.RecordStream.GetLastCheckpoint()

latestLSN, err := c.getCurrentLSN(ctx)
if err != nil {
c.logger.Error("error getting current LSN", slog.Any("error", err))
Expand All @@ -389,6 +393,12 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return nil
}

func (c *PostgresConnector) UpdateReplStateLastOffset(lastOffset int64) {
if c.replState != nil {
c.replState.LastOffset.Store(lastOffset)
}
}

// SyncRecords pushes records to the destination.
func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
Expand Down
4 changes: 2 additions & 2 deletions ui/app/alert-config/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ const AlertConfigPage: React.FC = () => {
<div>
<Label>
PeerDB has a built-in alerting feature to update you on your
mirrors. Here you can configure your Slack for PeerDB to send
alerts.
mirrors. Here you can configure your Alert Provider for PeerDB to
send alerts.
</Label>
</div>
<div style={{ ...tableStyle, marginTop: '2rem', maxHeight: '25em' }}>
Expand Down

0 comments on commit dd3d718

Please sign in to comment.