Skip to content

Commit

Permalink
fetch peer configurations in workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 14, 2024
1 parent 758727a commit 8c5948b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 0 deletions.
44 changes: 44 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,50 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) fetchPeerConfig(
ctx context.Context,
peerName string,
) (*protos.Peer, error) {
var bytes []byte
err := a.CatalogPool.QueryRow(ctx, `SELECT options FROM peers WHERE name = $1`, peerName).Scan(&bytes)
if err != nil {
return nil, fmt.Errorf("failed to get peer config: %w", err)
}

var peerConfig *protos.Peer
err = proto.Unmarshal(bytes, peerConfig)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal peer config: %w", err)
}

return peerConfig, nil
}

func (a *FlowableActivity) FetchSourceAndDestinationConfigs(
ctx context.Context,
input *protos.FetchSourceAndDestinationConfigsInput,
) (*protos.FetchSourceAndDestinationConfigsOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowJobName)
logger := activity.GetLogger(ctx)

srcConfig, err := a.fetchPeerConfig(ctx, input.SourcePeerName)
if err != nil {
logger.Error("failed to fetch source config", slog.Any("error", err))
return nil, fmt.Errorf("failed to fetch source config: %w", err)
}

dstConfig, err := a.fetchPeerConfig(ctx, input.DestinationPeerName)
if err != nil {
logger.Error("failed to fetch destination config", slog.Any("error", err))
return nil, fmt.Errorf("failed to fetch destination config: %w", err)
}

return &protos.FetchSourceAndDestinationConfigsOutput{
SourcePeer: srcConfig,
DestinationPeer: dstConfig,
}, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
Expand Down
14 changes: 14 additions & 0 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package peerflow

import (
"fmt"
"log/slog"
"time"

Expand All @@ -17,6 +18,19 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
) (*model.NormalizeFlowResponse, error) {
logger := workflow.GetLogger(ctx)

fetchPeerConfigsCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Hour,
})
var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput
if err := workflow.ExecuteLocalActivity(
fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs,
).Get(ctx, &peerConfigs); err != nil {
return nil, fmt.Errorf("failed to fetch peer configs: %w", err)
}

config.Source = peerConfigs.SourcePeer
config.Destination = peerConfigs.DestinationPeer

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: time.Minute,
Expand Down
13 changes: 13 additions & 0 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ func (s *SyncFlowExecution) executeSyncFlow(
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName))

fetchPeerConfigsCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Hour,
})
var peerConfigs *protos.FetchSourceAndDestinationConfigsOutput
if err := workflow.ExecuteLocalActivity(
fetchPeerConfigsCtx, flowable.FetchSourceAndDestinationConfigs,
).Get(ctx, &peerConfigs); err != nil {
return nil, fmt.Errorf("failed to fetch peer configs: %w", err)
}

config.Source = peerConfigs.SourcePeer
config.Destination = peerConfigs.DestinationPeer

startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: time.Minute,
Expand Down
11 changes: 11 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,14 @@ message AddTablesToPublicationInput{
repeated TableMapping additional_tables = 3;
}

message FetchSourceAndDestinationConfigsInput {
string source_peer_name = 1;
string destination_peer_name = 2;
string flow_job_name = 3;
}

message FetchSourceAndDestinationConfigsOutput {
peerdb_peers.Peer source_peer = 1;
peerdb_peers.Peer destination_peer = 2;
}

0 comments on commit 8c5948b

Please sign in to comment.