From bb88bd12c40f838a19c46eaabe2bbd8c40580904 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Sun, 5 Jan 2025 21:53:47 +0530 Subject: [PATCH] [snapshot] close stream when fail to get srcConn in errgroup (#2409) if pull connector fails to initialize due to some reason and sync is already stuck on `chan receive`, then partClone workflow will hang indefinitely. fixed by closing stream explicitly before pull side errgroup goroutine returns. --- flow/activities/flowable.go | 4 ++++ flow/activities/flowable_core.go | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e867876fb..af18ba566 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -47,6 +47,10 @@ type FlowableActivity struct { OtelManager *otel_metrics.OtelManager } +type StreamCloser interface { + Close(error) +} + func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 6aee4a7e0..32b404891 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -387,7 +387,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto } // replicateQRepPartition replicates a QRepPartition from the source to the destination. -func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( +func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( ctx context.Context, a *FlowableActivity, config *protos.QRepConfig, @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + stream.Close(err) return fmt.Errorf("failed to get qrep source connector: %w", err) } defer connectors.CloseConnector(ctx, srcConn)