Skip to content

Commit

Permalink
One normalize for every 32 syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 5, 2023
1 parent 1b2fbf4 commit 61e0824
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 80 deletions.
21 changes: 20 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ func CDCFlowWorkflowWithConfig(
currentSyncFlowNum := 0
totalRecordsSynced := 0

// sync will send normalize changes;
// which will be handled concurrently
syncNormChan := workflow.NewChannel(ctx)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
NormalizeFlowWorkflow,
cfg,
syncNormChan,
)

for {
// check and act on signals before a fresh flow starts.
w.receiveAndHandleSignalAsync(ctx, state)
Expand Down Expand Up @@ -357,6 +367,7 @@ func CDCFlowWorkflowWithConfig(
SyncFlowWorkflow,
cfg,
syncFlowOptions,
syncNormChan,
)

var childSyncFlowRes *model.SyncResponse
Expand All @@ -372,8 +383,16 @@ func CDCFlowWorkflowWithConfig(
}

w.logger.Info("Total records synced: ", totalRecordsSynced)
}

/* TODO send childSyncFlowRes.TableSchemaDeltas */
syncNormChan.Close()
var childNormalizeFlowRes *NormalizeFlowResult
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err)
} else {
state.NormalizeFlowStatuses = childNormalizeFlowRes.NormalizeFlowStatuses
state.NormalizeFlowErrors = childNormalizeFlowRes.NormalizeFlowErrors
}

// cancel the SendWalHeartbeat activity
Expand Down
132 changes: 54 additions & 78 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package peerflow

import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -13,20 +12,18 @@ import (
"go.temporal.io/sdk/workflow"
)

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
type NormalizeFlowResult struct {
NormalizeFlowStatuses []*model.NormalizeResponse
NormalizeFlowErrors error
}

func NewNormalizeFlowExecution(ctx workflow.Context) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
Expand All @@ -35,17 +32,15 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState)
func NormalizeFlowWorkflow(
ctx workflow.Context,
cfg *protos.FlowConnectionConfigs,
state *CDCFlowWorkflowState,
) (*CDCFlowWorkflowResult, error) {
if state == nil {
state = NewCDCFlowWorkflowState()
}

syncNormChan workflow.Channel,
) (*NormalizeFlowResult, error) {
w := NewCDCFlowWorkflowExecution(ctx)

res := NormalizeFlowResult {}

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
return nil, err
}

childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
Expand All @@ -57,85 +52,66 @@ func NormalizeFlowWorkflow(
}
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)

/* TODO LISTEN FOR TABLE SCHEMA DELTAS */
var tableSchemaDeltas []*protos.TableSchemaDelta = nil
/*
if childSyncFlowRes != nil {
tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas
for {
var tableSchemaDeltasRecv interface{}
if !syncNormChan.Receive(ctx, &tableSchemaDeltasRecv) {
break
}
*/
var tableSchemaDeltas []*protos.TableSchemaDelta = tableSchemaDeltasRecv.([]*protos.TableSchemaDelta)

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltas != nil {
modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas))
modifiedDstTables := make([]string, 0, len(tableSchemaDeltas))
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltas != nil {
modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas))
modifiedDstTables := make([]string, 0, len(tableSchemaDeltas))

for _, tableSchemaDelta := range tableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}
for _, tableSchemaDelta := range tableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
})

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err)
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
}
}
}
}

childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
NormalizeFlowWorkflow,
cfg,
)

selector := workflow.NewSelector(ctx)
selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) {
var childNormalizeFlowRes *model.NormalizeResponse
if err := f.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err)
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
})
selector.Select(ctx)

s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: cfg.FlowJobName,
Progress: []string{},
})
s := NewNormalizeFlowExecution(ctx)

s.logger.Info("executing normalize flow - ", s.CDCFlowName)
s.logger.Info("executing normalize flow - ", cfg.FlowJobName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})
normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: cfg,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)
// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: cfg,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err)
} else {
res.NormalizeFlowStatuses = append(res.NormalizeFlowStatuses, normalizeResponse)
}
}

return state, nil
return &res, nil
}
12 changes: 11 additions & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
config *protos.FlowConnectionConfigs,
opts *protos.SyncFlowOptions,
relationMessageMapping model.RelationMessageMapping,
syncNormChan workflow.Channel,
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow - ", s.CDCFlowName)

Expand Down Expand Up @@ -95,6 +96,8 @@ func (s *SyncFlowExecution) executeSyncFlow(
return nil, fmt.Errorf("failed to replay schema delta: %w", err)
}

syncNormChan.Send(ctx, syncRes.TableSchemaDeltas)

return syncRes, nil
}

Expand All @@ -104,11 +107,18 @@ func (s *SyncFlowExecution) executeSyncFlow(
func SyncFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
syncNormChan workflow.Channel,
) (*model.SyncResponse, error) {
s := NewSyncFlowExecution(ctx, &SyncFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping)
return s.executeSyncFlow(
ctx,
config,
options,
options.RelationMessageMapping,
syncNormChan,
)
}

0 comments on commit 61e0824

Please sign in to comment.