From 8c5948b05dc2c0ccd4b18ba5103596c3912cbff1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 14 Feb 2024 16:25:04 -0500 Subject: [PATCH 1/2] fetch peer configurations in workflow --- flow/activities/flowable.go | 44 ++++++++++++++++++++++++++++++++ flow/workflows/normalize_flow.go | 14 ++++++++++ flow/workflows/sync_flow.go | 13 ++++++++++ protos/flow.proto | 11 ++++++++ 4 files changed, 82 insertions(+) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2086aa8cfd..50f979510c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -207,6 +207,50 @@ func (a *FlowableActivity) CreateNormalizedTable( }, nil } +func (a *FlowableActivity) fetchPeerConfig( + ctx context.Context, + peerName string, +) (*protos.Peer, error) { + var bytes []byte + err := a.CatalogPool.QueryRow(ctx, `SELECT options FROM peers WHERE name = $1`, peerName).Scan(&bytes) + if err != nil { + return nil, fmt.Errorf("failed to get peer config: %w", err) + } + + var peerConfig *protos.Peer + err = proto.Unmarshal(bytes, peerConfig) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal peer config: %w", err) + } + + return peerConfig, nil +} + +func (a *FlowableActivity) FetchSourceAndDestinationConfigs( + ctx context.Context, + input *protos.FetchSourceAndDestinationConfigsInput, +) (*protos.FetchSourceAndDestinationConfigsOutput, error) { + ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) + logger := activity.GetLogger(ctx) + + srcConfig, err := a.fetchPeerConfig(ctx, input.SourcePeerName) + if err != nil { + logger.Error("failed to fetch source config", slog.Any("error", err)) + return nil, fmt.Errorf("failed to fetch source config: %w", err) + } + + dstConfig, err := a.fetchPeerConfig(ctx, input.DestinationPeerName) + if err != nil { + logger.Error("failed to fetch destination config", slog.Any("error", err)) + return nil, fmt.Errorf("failed to fetch destination config: %w", err) + } + + return &protos.FetchSourceAndDestinationConfigsOutput{ + SourcePeer: srcConfig, + DestinationPeer: dstConfig, + }, nil +} + func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput, ) (*model.SyncResponse, error) { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index f37544b14a..a47df1c8ab 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -1,6 +1,7 @@ package peerflow import ( + "fmt" "log/slog" "time" @@ -17,6 +18,19 @@ func NormalizeFlowWorkflow(ctx workflow.Context, ) (*model.NormalizeFlowResponse, error) { logger := workflow.GetLogger(ctx) + fetchPeerConfigsCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + StartToCloseTimeout: 1 * time.Hour, + }) + var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput + if err := workflow.ExecuteLocalActivity( + fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs, + ).Get(ctx, &peerConfigs); err != nil { + return nil, fmt.Errorf("failed to fetch peer configs: %w", err) + } + + config.Source = peerConfigs.SourcePeer + config.Destination = peerConfigs.DestinationPeer + normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, HeartbeatTimeout: time.Minute, diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index e3a53c9250..2486c9e114 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -41,6 +41,19 @@ func (s *SyncFlowExecution) executeSyncFlow( ) (*model.SyncResponse, error) { s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName)) + fetchPeerConfigsCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + StartToCloseTimeout: 1 * time.Hour, + }) + var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput + if err := workflow.ExecuteLocalActivity( + fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs, + ).Get(ctx, &peerConfigs); err != nil { + return nil, fmt.Errorf("failed to fetch peer configs: %w", err) + } + + config.Source = peerConfigs.SourcePeer + config.Destination = peerConfigs.DestinationPeer + startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: time.Minute, diff --git a/protos/flow.proto b/protos/flow.proto index 2147ee2fc8..b741357e22 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -401,3 +401,14 @@ message AddTablesToPublicationInput{ repeated TableMapping additional_tables = 3; } +message FetchSourceAndDestinationConfigsInput { + string source_peer_name = 1; + string destination_peer_name = 2; + string flow_job_name = 3; +} + +message FetchSourceAndDestinationConfigsOutput { + peerdb_peers.Peer source_peer = 1; + peerdb_peers.Peer destination_peer = 2; +} + From 637f9196667e07352844c8dd9494bf5b1063444e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 14 Feb 2024 16:31:59 -0500 Subject: [PATCH 2/2] pass input for real --- flow/workflows/normalize_flow.go | 5 +++++ flow/workflows/sync_flow.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index a47df1c8ab..e47a1c4a35 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -24,6 +24,11 @@ func NormalizeFlowWorkflow(ctx workflow.Context, var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput if err := workflow.ExecuteLocalActivity( fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs, + &protos.FetchSourceAndDestinationConfigsInput{ + FlowJobName: config.FlowJobName, + SourcePeerName: config.Source.Name, + DestinationPeerName: config.Destination.Name, + }, ).Get(ctx, &peerConfigs); err != nil { return nil, fmt.Errorf("failed to fetch peer configs: %w", err) } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 2486c9e114..871789baa9 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -47,6 +47,11 @@ func (s *SyncFlowExecution) executeSyncFlow( var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput if err := workflow.ExecuteLocalActivity( fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs, + &protos.FetchSourceAndDestinationConfigsInput{ + FlowJobName: config.FlowJobName, + SourcePeerName: config.Source.Name, + DestinationPeerName: config.Destination.Name, + }, ).Get(ctx, &peerConfigs); err != nil { return nil, fmt.Errorf("failed to fetch peer configs: %w", err) }