From 6eeaeae7d39a99e25da539dd911357a7d8327401 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 21:51:50 +0530 Subject: [PATCH 1/7] make slotsize part of errgroup --- flow/activities/flowable.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 005328e169..9e20c9c5b5 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -160,38 +160,44 @@ func (a *FlowableActivity) CreateNormalizedTable( return conn.SetupNormalizedTables(config) } +func (a *FlowableActivity) handleSlotInfo(ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, peerName string) { + slotInfo, err := srcConn.GetSlotInfo(slotName) + if err != nil { + log.Warnf("warning: failed to get slot info: %v", err) + time.Sleep(30 * time.Second) + return + } + + if len(slotInfo) != 0 { + a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + } +} + func (a *FlowableActivity) recordSlotSizePeriodically( ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, done <-chan struct{}, peerName string, -) { +) error { timeout := 10 * time.Minute ticker := time.NewTicker(timeout) defer ticker.Stop() for { - slotInfo, err := srcConn.GetSlotInfo(slotName) - if err != nil { - log.Warnf("warning: failed to get slot info: %v", err) - } - - if len(slotInfo) == 0 { - continue - } - select { case <-ticker.C: - a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + a.handleSlotInfo(ctx, srcConn, slotName, peerName) case <-done: - a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + a.handleSlotInfo(ctx, srcConn, slotName, peerName) + case <-ctx.Done(): + log.Warn("recordSlotSize: context is done. ignoring") + time.Sleep(30 * time.Second) } ticker.Stop() ticker = time.NewTicker(timeout) } - } // StartFlow implements StartFlow. @@ -246,7 +252,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName } - go a.recordSlotSizePeriodically(ctx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name) + errGroup.Go(func() error { + return a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name) + }) // start a goroutine to pull records from the source errGroup.Go(func() error { return srcConn.PullRecords(&model.PullRecordsRequest{ From 8666290569b2a22896d0d705e068fc0f28de761f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 21:58:56 +0530 Subject: [PATCH 2/7] fix: exit on error --- flow/activities/flowable.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9e20c9c5b5..9d58ee9757 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -160,17 +160,22 @@ func (a *FlowableActivity) CreateNormalizedTable( return conn.SetupNormalizedTables(config) } -func (a *FlowableActivity) handleSlotInfo(ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, peerName string) { +func (a *FlowableActivity) handleSlotInfo( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + peerName string, +) error { slotInfo, err := srcConn.GetSlotInfo(slotName) if err != nil { log.Warnf("warning: failed to get slot info: %v", err) - time.Sleep(30 * time.Second) - return + return err } if len(slotInfo) != 0 { - a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) } + return nil } func (a *FlowableActivity) recordSlotSizePeriodically( @@ -188,12 +193,16 @@ func (a *FlowableActivity) recordSlotSizePeriodically( for { select { case <-ticker.C: - a.handleSlotInfo(ctx, srcConn, slotName, peerName) + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + time.Sleep(30 * time.Second) + return nil + } case <-done: a.handleSlotInfo(ctx, srcConn, slotName, peerName) + return nil case <-ctx.Done(): - log.Warn("recordSlotSize: context is done. ignoring") - time.Sleep(30 * time.Second) + return nil } ticker.Stop() ticker = time.NewTicker(timeout) From 15acf647b853389229f0df80143c83eb820af9c9 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 21:59:54 +0530 Subject: [PATCH 3/7] remove tick sleep --- flow/activities/flowable.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9d58ee9757..68ea12357e 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -195,7 +195,6 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ticker.C: err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) if err != nil { - time.Sleep(30 * time.Second) return nil } case <-done: From 087922f5cadf383c6c1ef6b61e4c061adcb4f967 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 22:00:54 +0530 Subject: [PATCH 4/7] fix: return err in selct --- flow/activities/flowable.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 68ea12357e..dcc03bbe1f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -195,10 +195,13 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ticker.C: err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) if err != nil { - return nil + return err } case <-done: - a.handleSlotInfo(ctx, srcConn, slotName, peerName) + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return err + } return nil case <-ctx.Done(): return nil From 6834d31e457d4cf71f4bc9468a57bbf3563c48b0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 22:02:18 +0530 Subject: [PATCH 5/7] minor change --- flow/activities/flowable.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index dcc03bbe1f..885ad38c38 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -198,11 +198,7 @@ func (a *FlowableActivity) recordSlotSizePeriodically( return err } case <-done: - err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) - if err != nil { - return err - } - return nil + return a.handleSlotInfo(ctx, srcConn, slotName, peerName) case <-ctx.Done(): return nil } From 79fcafb56d4c367da37ebc2b0e6f2c57aab040a8 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 22:26:16 +0530 Subject: [PATCH 6/7] fix: do done before wait --- flow/activities/flowable.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 885ad38c38..961c967b30 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -304,6 +304,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } if !hasRecords { + done <- struct{}{} // wait for the pull goroutine to finish err = errGroup.Wait() if err != nil { @@ -338,7 +339,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, log.Warnf("failed to push records: %v", err) return nil, fmt.Errorf("failed to push records: %w", err) } - + done <- struct{}{} err = errGroup.Wait() if err != nil { return nil, fmt.Errorf("failed to pull records: %w", err) @@ -386,7 +387,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) - done <- struct{}{} return res, nil } From e6fab373c3a77ee6379c662a335118e3f39012de Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 Dec 2023 23:05:00 +0530 Subject: [PATCH 7/7] fix: no errgroup and done --- flow/activities/flowable.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 961c967b30..4a314a30a1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -182,10 +182,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, - done <-chan struct{}, peerName string, -) error { - +) { timeout := 10 * time.Minute ticker := time.NewTicker(timeout) @@ -195,12 +193,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically( case <-ticker.C: err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) if err != nil { - return err + return } - case <-done: - return a.handleSlotInfo(ctx, srcConn, slotName, peerName) case <-ctx.Done(): - return nil + return } ticker.Stop() ticker = time.NewTicker(timeout) @@ -211,12 +207,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically( func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") - done := make(chan struct{}) - defer close(done) conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -259,9 +251,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName } - errGroup.Go(func() error { - return a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name) - }) + go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) + // start a goroutine to pull records from the source errGroup.Go(func() error { return srcConn.PullRecords(&model.PullRecordsRequest{ @@ -304,7 +295,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } if !hasRecords { - done <- struct{}{} // wait for the pull goroutine to finish err = errGroup.Wait() if err != nil { @@ -339,7 +329,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, log.Warnf("failed to push records: %v", err) return nil, fmt.Errorf("failed to push records: %w", err) } - done <- struct{}{} + err = errGroup.Wait() if err != nil { return nil, fmt.Errorf("failed to pull records: %w", err)