diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index b23928f328..0b9168a931 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -55,17 +55,17 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( ) error { s.logger.Info("checking connections for CDC flow - ", s.cdcFlowName) - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 2 * time.Minute, + checkCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + StartToCloseTimeout: time.Minute, }) // first check the source peer connection - srcConnStatusFuture := workflow.ExecuteActivity(ctx, flowable.CheckConnection, &protos.SetupInput{ + srcConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckConnection, &protos.SetupInput{ Peer: config.Source, FlowName: config.FlowJobName, }) var srcConnStatus activities.CheckConnectionResult - if err := srcConnStatusFuture.Get(ctx, &srcConnStatus); err != nil { + if err := srcConnStatusFuture.Get(checkCtx, &srcConnStatus); err != nil { return fmt.Errorf("failed to check source peer connection: %w", err) } @@ -75,9 +75,9 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( } // then check the destination peer connection - destConnStatusFuture := workflow.ExecuteActivity(ctx, flowable.CheckConnection, dstSetupInput) + destConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckConnection, dstSetupInput) var destConnStatus activities.CheckConnectionResult - if err := destConnStatusFuture.Get(ctx, &destConnStatus); err != nil { + if err := destConnStatusFuture.Get(checkCtx, &destConnStatus); err != nil { return fmt.Errorf("failed to check destination peer connection: %w", err) } @@ -85,8 +85,11 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( // then setup the destination peer metadata tables if destConnStatus.NeedsSetupMetadataTables { - fDst := workflow.ExecuteActivity(ctx, flowable.SetupMetadataTables, dstSetupInput) - if err := fDst.Get(ctx, nil); err != nil { + setupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 2 * time.Minute, + }) + fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput) + if err := fDst.Get(setupCtx, nil); err != nil { return fmt.Errorf("failed to setup destination peer metadata tables: %w", err) } } else { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 8c1028069d..38a22f1961 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -39,9 +39,8 @@ func (s *SyncFlowExecution) executeSyncFlow( ) (*model.SyncResponse, error) { s.logger.Info("executing sync flow - ", s.CDCFlowName) - syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: 1 * time.Minute, - WaitForCancellation: true, }) // execute GetLastSyncedID on destination peer @@ -50,7 +49,7 @@ func (s *SyncFlowExecution) executeSyncFlow( FlowJobName: s.CDCFlowName, } - lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) + lastSyncFuture := workflow.ExecuteLocalActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput) var dstSyncState *protos.LastSyncState if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil { return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err)