Skip to content

Commit

Permalink
Use local activities in sync_flow / setup_flow (#1095)
Browse files Browse the repository at this point in the history
https://pkg.go.dev/go.temporal.io/[email protected]/workflow#ExecuteLocalActivity
Local activities are a hybrid between activities & side effects,
useful for short lived activities without heartbeats
  • Loading branch information
serprex authored Jan 18, 2024
1 parent e1bba89 commit 304841a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
19 changes: 11 additions & 8 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
) error {
s.logger.Info("checking connections for CDC flow - ", s.cdcFlowName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
checkCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: time.Minute,
})

// first check the source peer connection
srcConnStatusFuture := workflow.ExecuteActivity(ctx, flowable.CheckConnection, &protos.SetupInput{
srcConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckConnection, &protos.SetupInput{
Peer: config.Source,
FlowName: config.FlowJobName,
})
var srcConnStatus activities.CheckConnectionResult
if err := srcConnStatusFuture.Get(ctx, &srcConnStatus); err != nil {
if err := srcConnStatusFuture.Get(checkCtx, &srcConnStatus); err != nil {
return fmt.Errorf("failed to check source peer connection: %w", err)
}

Expand All @@ -75,18 +75,21 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
}

// then check the destination peer connection
destConnStatusFuture := workflow.ExecuteActivity(ctx, flowable.CheckConnection, dstSetupInput)
destConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckConnection, dstSetupInput)
var destConnStatus activities.CheckConnectionResult
if err := destConnStatusFuture.Get(ctx, &destConnStatus); err != nil {
if err := destConnStatusFuture.Get(checkCtx, &destConnStatus); err != nil {
return fmt.Errorf("failed to check destination peer connection: %w", err)
}

s.logger.Info("ensuring metadata table exists - ", s.cdcFlowName)

// then setup the destination peer metadata tables
if destConnStatus.NeedsSetupMetadataTables {
fDst := workflow.ExecuteActivity(ctx, flowable.SetupMetadataTables, dstSetupInput)
if err := fDst.Get(ctx, nil); err != nil {
setupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
})
fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput)
if err := fDst.Get(setupCtx, nil); err != nil {
return fmt.Errorf("failed to setup destination peer metadata tables: %w", err)
}
} else {
Expand Down
5 changes: 2 additions & 3 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ func (s *SyncFlowExecution) executeSyncFlow(
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow - ", s.CDCFlowName)

syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
WaitForCancellation: true,
})

// execute GetLastSyncedID on destination peer
Expand All @@ -50,7 +49,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
FlowJobName: s.CDCFlowName,
}

lastSyncFuture := workflow.ExecuteActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput)
lastSyncFuture := workflow.ExecuteLocalActivity(syncMetaCtx, flowable.GetLastSyncedID, lastSyncInput)
var dstSyncState *protos.LastSyncState
if err := lastSyncFuture.Get(syncMetaCtx, &dstSyncState); err != nil {
return nil, fmt.Errorf("failed to get last synced ID from destination peer: %w", err)
Expand Down

0 comments on commit 304841a

Please sign in to comment.