diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index acda08cc63..efdafbbb62 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -275,10 +275,9 @@ func (a *FlowableActivity) MaintainPull( } } -func (a *FlowableActivity) WaitForSourceConnector( - ctx context.Context, - sessionID string, -) error { +func (a *FlowableActivity) WaitForSourceConnector(ctx context.Context, sessionID string) error { + logger := activity.GetLogger(ctx) + attempt := 0 for { a.CdcCacheRw.RLock() _, ok := a.CdcCache[sessionID] @@ -287,6 +286,10 @@ func (a *FlowableActivity) WaitForSourceConnector( return nil } activity.RecordHeartbeat(ctx, "wait another second for source connector") + attempt += 1 + if attempt > 2 { + logger.Error("waiting on source connector setup", slog.Int("attempt", attempt)) + } if err := ctx.Err(); err != nil { return err }