Skip to content

Commit

Permalink
return instead of doing nothing
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 27, 2023
1 parent 8a17bde commit 8a166c3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
env:
PG_CDC: empty
PGPASSWORD: postgres

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
Expand Down
52 changes: 25 additions & 27 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,13 +707,13 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

peerTableExists := true
pgPeers, err := getPostgresPeerConfigs(ctx)
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
log.Warn("[sendwalheartbeat]: warning: no postgres peers found in catalog. Now I will be sleeping repeatedly")
peerTableExists = false
log.Warn("[sendwalheartbeat]: warning: no postgres peers found in catalog. Exiting")
return nil
}
return fmt.Errorf("error getting postgres peers: %w", err)
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
Expand All @@ -723,39 +723,37 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
log.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
if peerTableExists {
command := `
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
}
ticker.Stop()
ticker = time.NewTicker(sendTimeout)
}
ticker.Stop()
ticker = time.NewTicker(sendTimeout)
}
}

Expand Down

0 comments on commit 8a166c3

Please sign in to comment.