Skip to content

Commit

Permalink
Fix & test cdc rollover (#1436)
Browse files Browse the repository at this point in the history
Customer workflow recently got stuck after sync-stop,
add test covering cdc_flow returning ContinueAsNew multiple times

cdc_flow had custom retry logic to pass tests before they were switched to integration tests,
get rid of that, it was racy with signals, rely on temporal retries for workflow errors
& restarting cdc entirely if sync/normalize finish for whatever reason
  • Loading branch information
serprex authored Mar 5, 2024
1 parent 5ea04a8 commit 0a32419
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 88 deletions.
45 changes: 44 additions & 1 deletion flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,50 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {
e2e.RequireEnvCanceled(s.t, env)
}

// test don't work, make it work later
func (s PeerFlowE2ETestSuitePG) Test_ContinueAsNew() {
srcTableName := s.attachSchemaSuffix("test_continueasnew")
dstTableName := s.attachSchemaSuffix("test_continueasnew_dst")

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_continueasnew_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.peer,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig.MaxBatchSize = 2
flowConnConfig.IdleTimeoutSeconds = 10

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)

e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
for i := range 144 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 144 rows into the source table")

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 72 syncs", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,key,value") == nil
})
env.Cancel()

e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
srcTable1Name := s.attachSchemaSuffix("test_dynconfig_1")
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
qrepConfig.SetupWatermarkTableOnDestination = true

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
e2e.EnvWaitForFinished(s.t, env, 5*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration()
qrepConfig.SetupWatermarkTableOnDestination = true

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
e2e.EnvWaitForFinished(s.t, env, 5*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down
144 changes: 59 additions & 85 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,6 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
}
}

// CDCFlowWorkflowExecution represents the state for execution of a peer flow.
type CDCFlowWorkflowExecution struct {
flowExecutionID string
logger log.Logger
syncFlowFuture workflow.ChildWorkflowFuture
normFlowFuture workflow.ChildWorkflowFuture
}

// NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution.
func NewCDCFlowWorkflowExecution(ctx workflow.Context, flowName string) *CDCFlowWorkflowExecution {
return &CDCFlowWorkflowExecution{
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName)),
}
}

func GetSideEffect[T any](ctx workflow.Context, f func(workflow.Context) T) T {
sideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return f(ctx)
Expand Down Expand Up @@ -103,24 +87,26 @@ const (
maxSyncsPerCdcFlow = 32
)

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context,
func processCDCFlowConfigUpdate(
ctx workflow.Context,
logger log.Logger,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
) error {
flowConfigUpdate := state.FlowConfigUpdate

if flowConfigUpdate != nil {
w.logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
if len(flowConfigUpdate.AdditionalTables) == 0 {
return nil
}
if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) {
w.logger.Warn("duplicate source/destination tables found in additionalTables")
logger.Warn("duplicate source/destination tables found in additionalTables")
return nil
}
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

w.logger.Info("altering publication for additional tables")
logger.Info("altering publication for additional tables")
alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
Expand All @@ -129,11 +115,11 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
flowable.AddTablesToPublication,
cfg, flowConfigUpdate.AdditionalTables)
if err := alterPublicationAddAdditionalTablesFuture.Get(ctx, nil); err != nil {
w.logger.Error("failed to alter publication for additional tables: ", err)
logger.Error("failed to alter publication for additional tables: ", err)
return err
}

w.logger.Info("additional tables added to publication")
logger.Info("additional tables added to publication")
additionalTablesUUID := GetUUID(ctx)
childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID)
additionalTablesCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
Expand Down Expand Up @@ -167,13 +153,14 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)
w.logger.Info("additional tables added to sync flow")
logger.Info("additional tables added to sync flow")
}
return nil
}

func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
func addCdcPropertiesSignalListener(
ctx workflow.Context,
logger log.Logger,
selector workflow.Selector,
state *CDCFlowWorkflowState,
) {
Expand All @@ -189,21 +176,13 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate

w.logger.Info("CDC Signal received. Parameters on signal reception:",
logger.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})
}

func (w *CDCFlowWorkflowExecution) startSyncFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions) {
w.syncFlowFuture = workflow.ExecuteChildWorkflow(ctx, SyncFlowWorkflow, config, options)
}

func (w *CDCFlowWorkflowExecution) startNormFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs) {
w.normFlowFuture = workflow.ExecuteChildWorkflow(ctx, NormalizeFlowWorkflow, config, nil)
}

func CDCFlowWorkflow(
ctx workflow.Context,
cfg *protos.FlowConnectionConfigs,
Expand All @@ -217,7 +196,7 @@ func CDCFlowWorkflow(
state = NewCDCFlowWorkflowState(cfg)
}

w := NewCDCFlowWorkflowExecution(ctx, cfg.FlowJobName)
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName))
flowSignalChan := model.FlowSignal.GetSignalChannel(ctx)

err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) {
Expand Down Expand Up @@ -248,36 +227,36 @@ func CDCFlowWorkflow(
selector := workflow.NewNamedSelector(ctx, "PauseLoop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger)
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
})
w.addCdcPropertiesSignalListener(ctx, selector, state)
addCdcPropertiesSignalListener(ctx, logger, selector, state)

startTime := workflow.Now(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

for state.ActiveSignal == model.PauseSignal {
// only place we block on receive, so signal processing is immediate
for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil {
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
selector.Select(ctx)
}
if err := ctx.Err(); err != nil {
return state, err
}

if state.FlowConfigUpdate != nil {
err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch)
err = processCDCFlowConfigUpdate(ctx, logger, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
w.logger.Info("wiping flow state after state update processing")
logger.Info("wiping flow state after state update processing")
// finished processing, wipe it
state.FlowConfigUpdate = nil
state.ActiveSignal = model.NoopSignal
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
logger.Info("mirror has been resumed after ", time.Since(startTime))
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
}

Expand Down Expand Up @@ -346,7 +325,7 @@ func CDCFlowWorkflow(
state.SyncFlowOptions.TableNameSchemaMapping,
)
if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil {
w.logger.Error("snapshot flow failed", slog.Any("error", err))
logger.Error("snapshot flow failed", slog.Any("error", err))
return state, fmt.Errorf("failed to execute snapshot workflow: %w", err)
}

Expand Down Expand Up @@ -385,7 +364,7 @@ func CDCFlowWorkflow(
}

state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
w.logger.Info("executed setup flow and snapshot flow")
logger.Info("executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialSnapshotOnly {
Expand Down Expand Up @@ -424,70 +403,56 @@ func CDCFlowWorkflow(
handleError := func(name string, err error) {
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
w.logger.Error(
logger.Error(
"panic in flow",
slog.String("name", name),
slog.Any("error", panicErr.Error()),
slog.String("stack", panicErr.StackTrace()),
)
} else {
w.logger.Error("error in flow", slog.String("name", name), slog.Any("error", err))
logger.Error("error in flow", slog.String("name", name), slog.Any("error", err))
}
}

finishSyncNormalize := func() {
restart = true
_ = model.SyncStopSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, struct{}{}).Get(ctx, nil)
}
syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions)
normFlowFuture := workflow.ExecuteChildWorkflow(normCtx, NormalizeFlowWorkflow, cfg, nil)

mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})

var handleNormFlow, handleSyncFlow func(workflow.Future)
handleSyncFlow = func(f workflow.Future) {
mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
handleError("sync", err)
}

if restart {
w.logger.Info("sync finished, finishing normalize")
w.syncFlowFuture = nil
_ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, model.NormalizePayload{
logger.Info("sync finished, finishing normalize")
syncFlowFuture = nil
restart = true
if normFlowFuture != nil {
err = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{
Done: true,
SyncBatchID: -1,
}).Get(ctx, nil)
} else {
w.logger.Warn("sync flow ended, restarting", slog.Any("error", err))
w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions)
mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow)
if err != nil {
logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err))
finished = true
}
}
}
handleNormFlow = func(f workflow.Future) {
})
mainLoopSelector.AddFuture(normFlowFuture, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
handleError("normalize", err)
}

if restart {
w.logger.Info("normalize finished")
w.normFlowFuture = nil
finished = true
} else {
w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err))
w.startNormFlow(normCtx, cfg)
mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow)
}
}

w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions)
mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow)

w.startNormFlow(normCtx, cfg)
mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow)
logger.Info("normalize finished, finishing")
normFlowFuture = nil
restart = true
finished = true
})

flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger)
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
})

syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx)
Expand All @@ -504,7 +469,9 @@ func CDCFlowWorkflow(

normChan := model.NormalizeSignal.GetSignalChannel(ctx)
normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) {
_ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, payload).Get(ctx, nil)
if normFlowFuture != nil {
_ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil)
}
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping)
})

Expand All @@ -514,13 +481,13 @@ func CDCFlowWorkflow(
if !parallel {
normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) {
if w.syncFlowFuture != nil {
_ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil)
if syncFlowFuture != nil {
_ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil)
}
})
}

w.addCdcPropertiesSignalListener(ctx, mainLoopSelector, state)
addCdcPropertiesSignalListener(ctx, logger, mainLoopSelector, state)

state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
for {
Expand All @@ -529,12 +496,19 @@ func CDCFlowWorkflow(
mainLoopSelector.Select(ctx)
}
if err := ctx.Err(); err != nil {
w.logger.Info("mirror canceled", slog.Any("error", err))
logger.Info("mirror canceled", slog.Any("error", err))
return state, err
}

if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncsPerCdcFlow {
finishSyncNormalize()
restart = true
if syncFlowFuture != nil {
err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil)
if err != nil {
logger.Warn("failed to send sync-stop, finishing", slog.Any("error", err))
finished = true
}
}
}

if restart {
Expand All @@ -547,7 +521,7 @@ func CDCFlowWorkflow(
}

if err := ctx.Err(); err != nil {
w.logger.Info("mirror canceled", slog.Any("error", err))
logger.Info("mirror canceled", slog.Any("error", err))
return nil, err
}

Expand Down

0 comments on commit 0a32419

Please sign in to comment.