From ec356529df8d81c0cc28776ffaa17bfdce5a5569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 7 May 2024 02:12:13 +0000 Subject: [PATCH] fix pubsub race --- flow/connectors/pubsub/pubsub.go | 2 +- flow/connectors/pubsub/qrep.go | 2 +- flow/e2e/pubsub/pubsub_test.go | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 842a8200a3..5cf3f579b7 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -325,10 +325,10 @@ Loop: } close(flushLoopDone) - close(publish) if err := pool.Wait(wgCtx); err != nil { return nil, err } + close(publish) topiccache.Stop(wgCtx) select { case <-wgCtx.Done(): diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index a70ff8dbee..c6e1138304 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -110,10 +110,10 @@ Loop: } } - close(publish) if err := pool.Wait(wgCtx); err != nil { return 0, err } + close(publish) topiccache.Stop(wgCtx) select { case <-wgCtx.Done(): diff --git a/flow/e2e/pubsub/pubsub_test.go b/flow/e2e/pubsub/pubsub_test.go index 0261752b65..df6809b819 100644 --- a/flow/e2e/pubsub/pubsub_test.go +++ b/flow/e2e/pubsub/pubsub_test.go @@ -278,7 +278,6 @@ func (s PubSubSuite) TestInitialLoad() { }) require.NoError(s.t, err) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (id, val) VALUES (1, 'testval') `, srcTableName)) require.NoError(s.t, err)