Skip to content

Commit

Permalink
cdc_flow: move WaitForSourceConnector into SyncFlow (#1373)
Browse files Browse the repository at this point in the history
Trying to avoid issues being ran into with one sync & resets:
```
source connector missing from CdcCache
```
  • Loading branch information
serprex authored Feb 25, 2024
1 parent 876d477 commit d614837
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 34 deletions.
16 changes: 7 additions & 9 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,23 +244,23 @@ func (a *FlowableActivity) MaintainPull(
}
}

func (a *FlowableActivity) WaitForSourceConnector(ctx context.Context, sessionID string) error {
func (a *FlowableActivity) waitForCdcCache(ctx context.Context, sessionID string) (connectors.CDCPullConnector, error) {
logger := activity.GetLogger(ctx)
attempt := 0
for {
a.CdcCacheRw.RLock()
_, ok := a.CdcCache[sessionID]
conn, ok := a.CdcCache[sessionID]
a.CdcCacheRw.RUnlock()
if ok {
return nil
return conn, nil
}
activity.RecordHeartbeat(ctx, "wait another second for source connector")
attempt += 1
if attempt > 2 {
logger.Info("waiting on source connector setup", slog.Int("attempt", attempt))
}
if err := ctx.Err(); err != nil {
return err
return nil, err
}
time.Sleep(time.Second)
}
Expand Down Expand Up @@ -288,11 +288,9 @@ func (a *FlowableActivity) SyncFlow(
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

a.CdcCacheRw.RLock()
srcConn, ok := a.CdcCache[sessionID]
a.CdcCacheRw.RUnlock()
if !ok {
return nil, errors.New("source connector missing from CdcCache")
srcConn, err := a.waitForCdcCache(ctx, sessionID)
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, err
Expand Down
32 changes: 7 additions & 25 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,31 +387,6 @@ func CDCFlowWorkflowWithConfig(
cfg,
sessionInfo.SessionID,
)
fSessionSetup := workflow.ExecuteActivity(
syncCtx,
flowable.WaitForSourceConnector,
sessionInfo.SessionID,
)

var sessionError error
sessionSelector := workflow.NewNamedSelector(ctx, "Session Setup")
sessionSelector.AddFuture(fMaintain, func(f workflow.Future) {
// MaintainPull should never exit without an error before this point
sessionError = f.Get(syncCtx, nil)
})
sessionSelector.AddFuture(fSessionSetup, func(f workflow.Future) {
// Happy path is waiting for this to return without error
sessionError = f.Get(syncCtx, nil)
})
sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
sessionError = ctx.Err()
})
sessionSelector.Select(ctx)
if sessionError != nil {
state.SyncFlowErrors = append(state.SyncFlowErrors, sessionError.Error())
state.TruncateProgress(w.logger)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, state)
}

currentSyncFlowNum := 0
totalRecordsSynced := int64(0)
Expand Down Expand Up @@ -467,6 +442,13 @@ func CDCFlowWorkflowWithConfig(
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
mainLoopSelector.AddFuture(fMaintain, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
w.logger.Error("MaintainPull failed", slog.Any("error", err))
canceled = true
}
})
flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger)
})
Expand Down

0 comments on commit d614837

Please sign in to comment.