diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index a47df1c8ab..e47a1c4a35 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -24,6 +24,11 @@ func NormalizeFlowWorkflow(ctx workflow.Context, var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput if err := workflow.ExecuteLocalActivity( fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs, + &protos.FetchSourceAndDestinationConfigsInput{ + FlowJobName: config.FlowJobName, + SourcePeerName: config.Source.Name, + DestinationPeerName: config.Destination.Name, + }, ).Get(ctx, &peerConfigs); err != nil { return nil, fmt.Errorf("failed to fetch peer configs: %w", err) } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 2486c9e114..871789baa9 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -47,6 +47,11 @@ func (s *SyncFlowExecution) executeSyncFlow( var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput if err := workflow.ExecuteLocalActivity( fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs, + &protos.FetchSourceAndDestinationConfigsInput{ + FlowJobName: config.FlowJobName, + SourcePeerName: config.Source.Name, + DestinationPeerName: config.Destination.Name, + }, ).Get(ctx, &peerConfigs); err != nil { return nil, fmt.Errorf("failed to fetch peer configs: %w", err) }