Skip to content

Commit

Permalink
Turns out you can't pass workflow.Channel as a parameter to a workflo…
Browse files Browse the repository at this point in the history
…w so rework things to use workflow ids & signals
  • Loading branch information
serprex committed Dec 5, 2023
1 parent f4ddf26 commit 7878cc6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
11 changes: 7 additions & 4 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,15 @@ func CDCFlowWorkflowWithConfig(

// sync will send normalize changes;
// which will be handled concurrently
syncNormChan := workflow.NewChannel(ctx)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
ctx,
NormalizeFlowWorkflow,
cfg,
syncNormChan,
)
var normExecution workflow.Execution
if err := childNormalizeFlowFuture.GetChildWorkflowExecution().Get(ctx, &normExecution); err != nil {
return state, fmt.Errorf("normalize workflow failed to start: %w", err)
}

for {
// check and act on signals before a fresh flow starts.
Expand Down Expand Up @@ -367,7 +369,8 @@ func CDCFlowWorkflowWithConfig(
SyncFlowWorkflow,
cfg,
syncFlowOptions,
syncNormChan,
normExecution.ID,
normExecution.RunID,
)

var childSyncFlowRes *model.SyncResponse
Expand All @@ -385,7 +388,7 @@ func CDCFlowWorkflowWithConfig(
w.logger.Info("Total records synced: ", totalRecordsSynced)
}

syncNormChan.Close()
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "StopLoop", true)
var childNormalizeFlowRes *NormalizeFlowResult
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
Expand Down
70 changes: 41 additions & 29 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func NewNormalizeFlowExecution(ctx workflow.Context) *NormalizeFlowExecution {
func NormalizeFlowWorkflow(
ctx workflow.Context,
cfg *protos.FlowConnectionConfigs,
syncNormChan workflow.Channel,
) (*NormalizeFlowResult, error) {
schemaDeltas := workflow.GetSignalChannel(ctx, "SchemaDelta")
stopLoopChan := workflow.GetSignalChannel(ctx, "StopLoop")
w := NewCDCFlowWorkflowExecution(ctx)

res := NormalizeFlowResult{}
Expand All @@ -52,41 +53,52 @@ func NormalizeFlowWorkflow(
}
ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts)

var stopLoop bool
for {
var tableSchemaDeltas []*protos.TableSchemaDelta
if !syncNormChan.Receive(ctx, &tableSchemaDeltas) {
break
// Sequence channel checks carefully to avoid race condition;
// must check & process all schema deltas before breaking loop
if !stopLoop {
var stopLoopVal bool
stopLoop = stopLoopChan.ReceiveAsync(&stopLoopVal) && stopLoopVal
}

// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltas != nil {
modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas))
modifiedDstTables := make([]string, 0, len(tableSchemaDeltas))

for _, tableSchemaDelta := range tableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}
var tableSchemaDeltas []*protos.TableSchemaDelta
received, _ := schemaDeltas.ReceiveWithTimeout(ctx, 5*time.Second, &tableSchemaDeltas)
if received {
if len(tableSchemaDeltas) != 0 {
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
modifiedSrcTables := make([]string, 0, len(tableSchemaDeltas))
modifiedDstTables := make([]string, 0, len(tableSchemaDeltas))

for _, tableSchemaDelta := range tableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
modifiedDstTables = append(modifiedDstTables, tableSchemaDelta.DstTableName)
}

getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err)
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema,
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
})

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
res.NormalizeFlowErrors = multierror.Append(res.NormalizeFlowErrors, err)
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
}
}
}
} else if stopLoop {
break
} else {
continue
}

s := NewNormalizeFlowExecution(ctx)
Expand Down
11 changes: 7 additions & 4 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (s *SyncFlowExecution) executeSyncFlow(
config *protos.FlowConnectionConfigs,
opts *protos.SyncFlowOptions,
relationMessageMapping model.RelationMessageMapping,
syncNormChan workflow.Channel,
normFlowId string,
normFlowRunId string,
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow - ", s.CDCFlowName)

Expand Down Expand Up @@ -96,7 +97,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
return nil, fmt.Errorf("failed to replay schema delta: %w", err)
}

syncNormChan.Send(ctx, syncRes.TableSchemaDeltas)
workflow.SignalExternalWorkflow(ctx, normFlowId, normFlowRunId, "SchemaDelta", syncRes.TableSchemaDeltas)

return syncRes, nil
}
Expand All @@ -107,7 +108,8 @@ func (s *SyncFlowExecution) executeSyncFlow(
func SyncFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
syncNormChan workflow.Channel,
normFlowId string,
normFlowRunId string,
) (*model.SyncResponse, error) {
s := NewSyncFlowExecution(ctx, &SyncFlowState{
CDCFlowName: config.FlowJobName,
Expand All @@ -119,6 +121,7 @@ func SyncFlowWorkflow(ctx workflow.Context,
config,
options,
options.RelationMessageMapping,
syncNormChan,
normFlowId,
normFlowRunId,
)
}

0 comments on commit 7878cc6

Please sign in to comment.