Skip to content

Commit

Permalink
Handle absence of peers table for SendWalHeartbeat (#724)
Browse files Browse the repository at this point in the history
If no peers table exists (aka nexus is not running), then flow tests may
fail at sendwalheartbeat. This PR handles that case in the
sendwalheartbeat activity.

---------

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and iskakaushik authored Nov 27, 2023
1 parent 30152d0 commit c99d0e5
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,11 @@ func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx,
"SELECT name, options FROM peers WHERE type=$1", protos.DBType_POSTGRES)
optionRows, err := catalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
WHERE p.type = $1`, protos.DBType_POSTGRES)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -702,26 +705,27 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

pgPeers, err := getPostgresPeerConfigs(ctx)
if err != nil {
return fmt.Errorf("error getting postgres peer configs: %w", err)
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
case <-ctx.Done():
log.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
pgPeers, err := getPostgresPeerConfigs(ctx)
if err != nil {
log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." +
"Skipping walheartbeat send. error encountered: " + err.Error())
continue
}

command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
Expand All @@ -743,10 +747,9 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
}
ticker.Stop()
ticker = time.NewTicker(sendTimeout)

}
ticker.Stop()
ticker = time.NewTicker(sendTimeout)
}
}

Expand Down

0 comments on commit c99d0e5

Please sign in to comment.