diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index c7a2bd614..7e00eca4a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -81,25 +81,6 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot return nil } -func (a *FlowableActivity) GetLastSyncedID( - ctx context.Context, - config *protos.GetLastSyncedIDInput, -) (*protos.LastSyncState, error) { - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) - if err != nil { - return nil, fmt.Errorf("failed to get connector: %w", err) - } - defer connectors.CloseConnector(ctx, dstConn) - - var lastOffset int64 - lastOffset, err = dstConn.GetLastOffset(ctx, config.FlowJobName) - if err != nil { - return nil, err - } - return &protos.LastSyncState{Checkpoint: lastOffset}, nil -} - func (a *FlowableActivity) EnsurePullability( ctx context.Context, config *protos.EnsurePullabilityBatchInput, @@ -225,6 +206,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, batchSize = 1_000_000 } + lastOffset, err := dstConn.GetLastOffset(ctx, input.FlowConnectionConfigs.FlowJobName) + if err != nil { + return nil, err + } + // start a goroutine to pull records from the source recordBatch := model.NewCDCRecordStream() startTime := time.Now() @@ -234,7 +220,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, FlowJobName: flowName, SrcTableIDNameMapping: input.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, - LastOffset: input.LastSyncState.Checkpoint, + LastOffset: lastOffset, MaxBatchSize: batchSize, IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( int(input.SyncFlowOptions.IdleTimeoutSeconds), diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 4f52a0c68..e3a53c925 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -41,29 +41,6 @@ func (s *SyncFlowExecution) executeSyncFlow( ) (*model.SyncResponse, error) { s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName)) - syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, - }) - - // execute GetLastSyncedID on destination peer - lastSyncInput := &protos.GetLastSyncedIDInput{ - PeerConnectionConfig: config.Destination, - FlowJobName: s.CDCFlowName, - } - - lastSyncFuture := workflow.ExecuteLocalActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) - var dstSyncState *protos.LastSyncState - if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil { - return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err) - } - - if dstSyncState != nil { - msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstSyncState.Checkpoint) - s.logger.Info(msg) - } else { - s.logger.Info("no last synced ID from destination peer") - } - startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: time.Minute, @@ -73,7 +50,6 @@ func (s *SyncFlowExecution) executeSyncFlow( // execute StartFlow on the peers to start the flow startFlowInput := &protos.StartFlowInput{ FlowConnectionConfigs: config, - LastSyncState: dstSyncState, SyncFlowOptions: opts, RelationMessageMapping: relationMessageMapping, SrcTableIdNameMapping: opts.SrcTableIdNameMapping, diff --git a/protos/flow.proto b/protos/flow.proto index e51c873b6..2147ee2fc 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -106,11 +106,13 @@ message SyncFlowOptions { repeated TableMapping table_mappings = 6; } +// deprecated, unused message LastSyncState { int64 checkpoint = 1; } message StartFlowInput { + // deprecated, unused LastSyncState last_sync_state = 1; FlowConnectionConfigs flow_connection_configs = 2; SyncFlowOptions sync_flow_options = 3; @@ -125,11 +127,6 @@ message StartNormalizeInput { int64 SyncBatchID = 3; } -message GetLastSyncedIDInput { - peerdb_peers.Peer peer_connection_config = 1; - string flow_job_name = 2; -} - message EnsurePullabilityBatchInput { peerdb_peers.Peer peer_connection_config = 1; string flow_job_name = 2;