diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2e2616c207..7e49c002bc 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -81,14 +81,19 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot func (a *FlowableActivity) GetLastSyncedID( ctx context.Context, config *protos.GetLastSyncedIDInput, -) (int64, error) { +) (*protos.LastSyncState, error) { dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { - return 0, fmt.Errorf("failed to get connector: %w", err) + return nil, fmt.Errorf("failed to get connector: %w", err) } defer connectors.CloseConnector(dstConn) - return dstConn.GetLastOffset(config.FlowJobName) + var lastOffset int64 + lastOffset, err = dstConn.GetLastOffset(config.FlowJobName) + if err != nil { + return nil, err + } + return &protos.LastSyncState{Checkpoint: lastOffset}, nil } // EnsurePullability implements EnsurePullability. diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 471617a7f1..0b207bc65f 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -69,13 +69,13 @@ func (s *SyncFlowExecution) executeSyncFlow( } lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) - var dstLastOffset int64 - if err := lastSyncFuture.Get(syncMetaCtx, &dstLastOffset); err != nil { + var dstSyncState *protos.LastSyncState + if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil { return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err) } - if dstLastOffset != 0 { - msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstLastOffset) + if dstSyncState != nil { + msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstSyncState.Checkpoint) s.logger.Info(msg) } else { s.logger.Info("no last synced ID from destination peer") @@ -89,7 +89,7 @@ func (s *SyncFlowExecution) executeSyncFlow( // execute StartFlow on the peers to start the flow startFlowInput := &protos.StartFlowInput{ FlowConnectionConfigs: config, - LastSyncState: &protos.LastSyncState{Checkpoint: dstLastOffset}, + LastSyncState: dstSyncState, SyncFlowOptions: opts, RelationMessageMapping: relationMessageMapping, }