Skip to content

Commit

Permalink
Move GetLastSyncedID into StartFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 9, 2024
1 parent 807c193 commit 8124726
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 60 deletions.
36 changes: 13 additions & 23 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,6 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
return nil
}

func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (*protos.LastSyncState, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

func (a *FlowableActivity) EnsurePullability(
ctx context.Context,
config *protos.EnsurePullabilityBatchInput,
Expand Down Expand Up @@ -222,20 +203,23 @@ func (a *FlowableActivity) WaitForSourceConnector(
sessionID string,
) error {
for {
if err := ctx.Err(); err != nil {
return err
}
a.CdcCacheRw.RLock()
srcConn, ok := a.CdcCache[sessionID]
a.CdcCacheRw.RUnlock()
if ok {
// continue attempting to reconnect until workflow canceled,
// maybe their database is down, maybe permissions are wrong,
// neither reasons enough to fail entire mirror
if err := srcConn.SetupReplConn(); err != nil {
activity.GetLogger(ctx).Error("Failed to setup replication connection", "Error", err)
} else {
return nil
}
}
activity.RecordHeartbeat(ctx, "wait another second for source connector")
if err := ctx.Err(); err != nil {
return err
}
time.Sleep(time.Second)
}
}
Expand Down Expand Up @@ -293,12 +277,18 @@ func (a *FlowableActivity) StartFlow(
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName

lastOffset, err := dstConn.GetLastOffset(flowName)
if err != nil {
return nil, err
}

errGroup.Go(func() error {
return srcConn.PullRecords(ctx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
LastOffset: lastOffset,
MaxBatchSize: batchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
Expand Down
9 changes: 3 additions & 6 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,17 @@ func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, cus

func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) {
query := `
SELECT
parent.oid AS parentrelid,
child.oid AS childrelid
SELECT parent.oid AS parentrelid, child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind='p';
`

rows, err := conn.Query(ctx, query, pgx.QueryExecModeSimpleProtocol)
if err != nil {
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
}

defer rows.Close()

childToParentRelIDMap := make(map[uint32]uint32)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (c *PostgresConnector) MaybeStartReplication(
c.replState.Slot != slotName ||
c.replState.Publication != publicationName) {
c.replConn.Close(ctx)
} else if err := c.replConn.Ping(ctx); err != nil {
} else if c.replConn.Ping(ctx) != nil {
c.replConn.Close(ctx)
}

Expand Down
24 changes: 0 additions & 24 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,32 +536,8 @@ func CDCFlowWorkflowWithConfig(
currentSyncFlowNum += 1
w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum), slog.String("flowName", cfg.FlowJobName))

syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
})
lastSyncInput := &protos.GetLastSyncedIDInput{
PeerConnectionConfig: cfg.Destination,
FlowJobName: cfg.FlowJobName,
}

lastSyncFuture := workflow.ExecuteLocalActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput)
var dstSyncState *protos.LastSyncState
if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil {
if ctx.Err() == nil {
w.logger.Error("failed to get last synced ID from destination peer", "error", err)
_ = workflow.Sleep(ctx, time.Second)
}
continue
}
if dstSyncState != nil {
w.logger.Info(fmt.Sprintf("last synced ID from destination peer - %d", dstSyncState.Checkpoint))
} else {
w.logger.Info("no last synced ID from destination peer")
}

startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: cfg,
LastSyncState: dstSyncState,
SyncFlowOptions: state.SyncFlowOptions,
RelationMessageMapping: state.RelationMessageMapping,
SrcTableIdNameMapping: state.SyncFlowOptions.SrcTableIdNameMapping,
Expand Down
6 changes: 0 additions & 6 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ message LastSyncState {
}

message StartFlowInput {
LastSyncState last_sync_state = 1;
FlowConnectionConfigs flow_connection_configs = 2;
SyncFlowOptions sync_flow_options = 3;
map<uint32, RelationMessage> relation_message_mapping = 4;
Expand All @@ -126,11 +125,6 @@ message StartNormalizeInput {
int64 SyncBatchID = 3;
}

message GetLastSyncedIDInput {
peerdb_peers.Peer peer_connection_config = 1;
string flow_job_name = 2;
}

message EnsurePullabilityBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
string flow_job_name = 2;
Expand Down

0 comments on commit 8124726

Please sign in to comment.