Skip to content

Commit

Permalink
don't overwrite options for child workflows when ContinueAsNew (#1232)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Feb 9, 2024
1 parent 804cff2 commit 1986b12
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 29 deletions.
33 changes: 15 additions & 18 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type CDCFlowWorkflowState struct {
Progress []string
// Accumulates status for sync flows spawned.
SyncFlowStatuses []*model.SyncResponse
// Accumulates status for sync flows spawned.
// Accumulates status for normalize flows spawned.
NormalizeFlowStatuses []model.NormalizeResponse
// Current signalled state of the peer flow.
ActiveSignal shared.CDCFlowSignal
Expand All @@ -48,8 +48,6 @@ type CDCFlowWorkflowState struct {
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// options passed to all NormalizeFlows
NormalizeFlowOptions *protos.NormalizeFlowOptions
// initially copied from config, all changes are made here though
TableMappings []*protos.TableMapping
}
Expand All @@ -61,8 +59,9 @@ func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWo
tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping))
}
return &CDCFlowWorkflowState{
Progress: []string{"started"},
SyncFlowStatuses: nil,
Progress: []string{"started"},
// 1 more than the limit of 10
SyncFlowStatuses: make([]*model.SyncResponse, 0, 11),
NormalizeFlowStatuses: nil,
ActiveSignal: shared.NoopSignal,
SyncFlowErrors: nil,
Expand All @@ -79,7 +78,6 @@ func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWo
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
NormalizeFlowOptions: nil,
TableMappings: tableMappings,
}
}
Expand Down Expand Up @@ -262,7 +260,7 @@ func CDCFlowWorkflowWithConfig(
if cfg.Resync {
for _, mapping := range state.TableMappings {
oldName := mapping.DestinationTableIdentifier
newName := fmt.Sprintf("%s_resync", oldName)
newName := oldName + "_resync"
mapping.DestinationTableIdentifier = newName
}
}
Expand Down Expand Up @@ -362,16 +360,16 @@ func CDCFlowWorkflowWithConfig(
}
}

state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
TableMappings: state.TableMappings,
}
state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
// when we carry forward state, don't remake the options
if state.SyncFlowOptions == nil {
state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
TableMappings: state.TableMappings,
}
}

currentSyncFlowNum := 0
Expand All @@ -396,7 +394,6 @@ func CDCFlowWorkflowWithConfig(
normCtx,
NormalizeFlowWorkflow,
cfg,
state.NormalizeFlowOptions,
)

var normWaitChan workflow.ReceiveChannel
Expand Down
10 changes: 3 additions & 7 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package peerflow

import (
"fmt"
"log/slog"
"time"

Expand All @@ -15,10 +14,8 @@ import (

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
options *protos.NormalizeFlowOptions,
) (*model.NormalizeFlowResponse, error) {
logger := workflow.GetLogger(ctx)
tableNameSchemaMapping := options.TableNameSchemaMapping

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
Expand All @@ -28,12 +25,13 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
results := make([]model.NormalizeResponse, 0, 4)
errors := make([]string, 0)
syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName)
var tableNameSchemaMapping map[string]*protos.TableSchema

var stopLoop, canceled bool
var lastSyncBatchID, syncBatchID int64
lastSyncBatchID = -1
syncBatchID = -1
selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-normalize", config.FlowJobName))
selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
Expand All @@ -46,9 +44,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
if s.SyncBatchID > syncBatchID {
syncBatchID = s.SyncBatchID
}
if len(s.TableNameSchemaMapping) != 0 {
tableNameSchemaMapping = s.TableNameSchemaMapping
}
tableNameSchemaMapping = s.TableNameSchemaMapping
})
for !stopLoop {
selector.Select(ctx)
Expand Down
4 changes: 0 additions & 4 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ message SyncFlowOptions {
repeated TableMapping table_mappings = 6;
}

message NormalizeFlowOptions {
map<string, TableSchema> table_name_schema_mapping = 1;
}

message LastSyncState {
int64 checkpoint = 1;
}
Expand Down

0 comments on commit 1986b12

Please sign in to comment.