Skip to content

Commit

Permalink
Don't make breaking change to GetLastSyncedID
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 14, 2023
1 parent a78644b commit c05f476
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
11 changes: 8 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,19 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (int64, error) {
) (*protos.LastSyncState, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return 0, fmt.Errorf("failed to get connector: %w", err)
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

return dstConn.GetLastOffset(config.FlowJobName)
var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

// EnsurePullability implements EnsurePullability.
Expand Down
10 changes: 5 additions & 5 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ func (s *SyncFlowExecution) executeSyncFlow(
}

lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput)
var dstLastOffset int64
if err := lastSyncFuture.Get(syncMetaCtx, &dstLastOffset); err != nil {
var dstSyncState *protos.LastSyncState
if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil {
return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err)
}

if dstLastOffset != 0 {
msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstLastOffset)
if dstSyncState != nil {
msg := fmt.Sprintf("last synced ID from destination peer - %d\n", dstSyncState.Checkpoint)
s.logger.Info(msg)
} else {
s.logger.Info("no last synced ID from destination peer")
Expand All @@ -89,7 +89,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
// execute StartFlow on the peers to start the flow
startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: config,
LastSyncState: &protos.LastSyncState{Checkpoint: dstLastOffset},
LastSyncState: dstSyncState,
SyncFlowOptions: opts,
RelationMessageMapping: relationMessageMapping,
}
Expand Down

0 comments on commit c05f476

Please sign in to comment.