diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7f3af3ea59..46fa5984e9 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -84,25 +84,6 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot return nil } -func (a *FlowableActivity) GetLastSyncedID( - ctx context.Context, - config *protos.GetLastSyncedIDInput, -) (*protos.LastSyncState, error) { - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) - if err != nil { - return nil, fmt.Errorf("failed to get connector: %w", err) - } - defer connectors.CloseConnector(dstConn) - - var lastOffset int64 - lastOffset, err = dstConn.GetLastOffset(config.FlowJobName) - if err != nil { - return nil, err - } - return &protos.LastSyncState{Checkpoint: lastOffset}, nil -} - func (a *FlowableActivity) EnsurePullability( ctx context.Context, config *protos.EnsurePullabilityBatchInput, @@ -222,13 +203,13 @@ func (a *FlowableActivity) WaitForSourceConnector( sessionID string, ) error { for { - if err := ctx.Err(); err != nil { - return err - } a.CdcCacheRw.RLock() srcConn, ok := a.CdcCache[sessionID] a.CdcCacheRw.RUnlock() if ok { + // continue attempting to reconnect until workflow canceled, + // maybe their database is down, maybe permissions are wrong, + // neither reasons enough to fail entire mirror if err := srcConn.SetupReplConn(); err != nil { activity.GetLogger(ctx).Error("Failed to setup replication connection", "Error", err) } else { @@ -236,6 +217,9 @@ func (a *FlowableActivity) WaitForSourceConnector( } } activity.RecordHeartbeat(ctx, "wait another second for source connector") + if err := ctx.Err(); err != nil { + return err + } time.Sleep(time.Second) } } @@ -293,12 +277,18 @@ func (a *FlowableActivity) StartFlow( recordBatch := model.NewCDCRecordStream() startTime := time.Now() flowName := input.FlowConnectionConfigs.FlowJobName + + lastOffset, err := dstConn.GetLastOffset(flowName) + if err != nil { + return nil, err + } + errGroup.Go(func() error { return srcConn.PullRecords(ctx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, SrcTableIDNameMapping: input.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, - LastOffset: input.LastSyncState.Checkpoint, + LastOffset: lastOffset, MaxBatchSize: batchSize, IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( int(input.SyncFlowOptions.IdleTimeoutSeconds), diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 502550a17b..2b3c229add 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -84,12 +84,10 @@ func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, cus func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) { query := ` - SELECT - parent.oid AS parentrelid, - child.oid AS childrelid + SELECT parent.oid AS parentrelid, child.oid AS childrelid FROM pg_inherits - JOIN pg_class parent ON pg_inherits.inhparent = parent.oid - JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid WHERE parent.relkind='p'; ` @@ -97,7 +95,6 @@ func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u if err != nil { return nil, fmt.Errorf("error querying for child to parent relid map: %w", err) } - defer rows.Close() childToParentRelIDMap := make(map[uint32]uint32) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index b322822948..1a0c4eac83 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -160,7 +160,7 @@ func (c *PostgresConnector) MaybeStartReplication( c.replState.Slot != slotName || c.replState.Publication != publicationName) { c.replConn.Close(ctx) - } else if err := c.replConn.Ping(ctx); err != nil { + } else if c.replConn.Ping(ctx) != nil { c.replConn.Close(ctx) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 621ec21744..4e789be670 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -538,32 +538,8 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum += 1 w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum), slog.String("flowName", cfg.FlowJobName)) - syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, - }) - lastSyncInput := &protos.GetLastSyncedIDInput{ - PeerConnectionConfig: cfg.Destination, - FlowJobName: cfg.FlowJobName, - } - - lastSyncFuture := workflow.ExecuteLocalActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) - var dstSyncState *protos.LastSyncState - if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil { - if ctx.Err() == nil { - w.logger.Error("failed to get last synced ID from destination peer", "error", err) - _ = workflow.Sleep(ctx, time.Second) - } - continue - } - if dstSyncState != nil { - w.logger.Info(fmt.Sprintf("last synced ID from destination peer - %d", dstSyncState.Checkpoint)) - } else { - w.logger.Info("no last synced ID from destination peer") - } - startFlowInput := &protos.StartFlowInput{ FlowConnectionConfigs: cfg, - LastSyncState: dstSyncState, SyncFlowOptions: state.SyncFlowOptions, RelationMessageMapping: state.RelationMessageMapping, SrcTableIdNameMapping: state.SyncFlowOptions.SrcTableIdNameMapping, diff --git a/protos/flow.proto b/protos/flow.proto index 63d32bb4e5..938621f0e3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -115,7 +115,6 @@ message LastSyncState { } message StartFlowInput { - LastSyncState last_sync_state = 1; FlowConnectionConfigs flow_connection_configs = 2; SyncFlowOptions sync_flow_options = 3; map relation_message_mapping = 4; @@ -130,11 +129,6 @@ message StartNormalizeInput { int64 SyncBatchID = 3; } -message GetLastSyncedIDInput { - peerdb_peers.Peer peer_connection_config = 1; - string flow_job_name = 2; -} - message EnsurePullabilityBatchInput { peerdb_peers.Peer peer_connection_config = 1; string flow_job_name = 2;