From 8a166c3a95ff7276c41eca916a99c124669afdf1 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 28 Nov 2023 00:24:00 +0530 Subject: [PATCH] return instead of doing nothing --- .github/workflows/flow.yml | 2 +- flow/activities/flowable.go | 52 ++++++++++++++++++------------------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index f5e081bf70..3085d14df4 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -92,7 +92,7 @@ jobs: env: PG_CDC: empty PGPASSWORD: postgres - + - name: run tests run: | gotestsum --format testname -- -p 8 ./... -timeout 2400s diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5ee3517f27..7041c02cab 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -707,13 +707,13 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { ticker := time.NewTicker(sendTimeout) defer ticker.Stop() - peerTableExists := true pgPeers, err := getPostgresPeerConfigs(ctx) if err != nil { if strings.Contains(err.Error(), "does not exist") { - log.Warn("[sendwalheartbeat]: warning: no postgres peers found in catalog. Now I will be sleeping repeatedly") - peerTableExists = false + log.Warn("[sendwalheartbeat]: warning: no postgres peers found in catalog. Exiting") + return nil } + return fmt.Errorf("error getting postgres peers: %w", err) } activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes") @@ -723,39 +723,37 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { log.Info("context is done, exiting wal heartbeat send loop") return nil case <-ticker.C: - if peerTableExists { - command := ` + command := ` BEGIN; DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); END; ` - // run above command for each Postgres peer - for _, pgPeer := range pgPeers { - pgConfig := pgPeer.GetPostgresConfig() - peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) - if peerErr != nil { - return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, peerErr) - } - - _, err := peerConn.Exec(ctx, command) - if err != nil { - log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err) - } - - closeErr := peerConn.Close(ctx) - if closeErr != nil { - return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, closeErr) - } - log.Infof("sent walheartbeat to peer %v", pgPeer.Name) + // run above command for each Postgres peer + for _, pgPeer := range pgPeers { + pgConfig := pgPeer.GetPostgresConfig() + peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) + if peerErr != nil { + return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w", + pgPeer.Name, pgConfig.Host, peerErr) + } + + _, err := peerConn.Exec(ctx, command) + if err != nil { + log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err) + } + + closeErr := peerConn.Close(ctx) + if closeErr != nil { + return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w", + pgPeer.Name, pgConfig.Host, closeErr) } + log.Infof("sent walheartbeat to peer %v", pgPeer.Name) } - ticker.Stop() - ticker = time.NewTicker(sendTimeout) } + ticker.Stop() + ticker = time.NewTicker(sendTimeout) } }