diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b959613429..c57bd29f92 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "strings" "sync" "time" @@ -713,45 +712,40 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { log.Info("context is done, exiting wal heartbeat send loop") return nil case <-ticker.C: - peersTableExists := true pgPeers, err := getPostgresPeerConfigs(ctx) if err != nil { - if strings.Contains(err.Error(), "does not exist") { - log.Warn("[sendwalheartbeat]: warning: peers table not found. skipping walheartbeat send.") - peersTableExists = false - } - return fmt.Errorf("error getting postgres peers: %w", err) + log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." + + "Skipping walheartbeat send. error encountered: " + err.Error()) + continue } - if peersTableExists { - command := ` + command := ` BEGIN; DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); END; ` - // run above command for each Postgres peer - for _, pgPeer := range pgPeers { - pgConfig := pgPeer.GetPostgresConfig() - peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) - if peerErr != nil { - return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, peerErr) - } - - _, err := peerConn.Exec(ctx, command) - if err != nil { - log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err) - } - - closeErr := peerConn.Close(ctx) - if closeErr != nil { - return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, closeErr) - } - log.Infof("sent walheartbeat to peer %v", pgPeer.Name) + // run above command for each Postgres peer + for _, pgPeer := range pgPeers { + pgConfig := pgPeer.GetPostgresConfig() + peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) + if peerErr != nil { + return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w", + pgPeer.Name, pgConfig.Host, peerErr) + } + + _, err := peerConn.Exec(ctx, command) + if err != nil { + log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err) + } + + closeErr := peerConn.Close(ctx) + if closeErr != nil { + return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w", + pgPeer.Name, pgConfig.Host, closeErr) } + log.Infof("sent walheartbeat to peer %v", pgPeer.Name) } } ticker.Stop()