Skip to content

Commit

Permalink
revert flow.yml and add no-peers case
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 27, 2023
1 parent c46c17d commit d3e8439
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 44 deletions.
15 changes: 1 addition & 14 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,7 @@ jobs:
env:
PG_CDC: empty
PGPASSWORD: postgres

- name: create peers table
run: |
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE TABLE IF NOT EXISTS peers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
type INTEGER NOT NULL,
options BYTEA NOT NULL
);"
working-directory: ./flow
env:
PG_CDC: empty
PGPASSWORD: postgres


- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
Expand Down
69 changes: 39 additions & 30 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -671,8 +672,11 @@ func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx,
"SELECT name, options FROM peers WHERE type=$1", protos.DBType_POSTGRES)
optionRows, err := catalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
WHERE p.type = $1`, protos.DBType_POSTGRES)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -703,9 +707,13 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

peerTableExists := true
pgPeers, err := getPostgresPeerConfigs(ctx)
if err != nil {
return fmt.Errorf("error getting postgres peer configs: %w", err)
if strings.Contains(err.Error(), "does not exist") {
log.Warn("[sendwalheartbeat]: warning: no postgres peers found in catalog. Now I will be sleeping repeatedly")
peerTableExists = false
}
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
Expand All @@ -715,37 +723,38 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
log.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
if peerTableExists {
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
}
ticker.Stop()
ticker = time.NewTicker(sendTimeout)

}
}
}
Expand Down

0 comments on commit d3e8439

Please sign in to comment.